From 2c41a7a6b3ed7abd39eec239b8edd788ac22cfea Mon Sep 17 00:00:00 2001 From: Gabriel Ramos Date: Fri, 5 Jun 2026 14:40:05 -0400 Subject: [PATCH] feat: implement binance p2p collector daemon Set up continuous P2P VES/USDT market history data collection, normalization, validation, and date-partitioned Parquet storage. --- .gitignore | 11 + base_plan.md | 760 ++++++++++++++++++ p2p-collector/Makefile | 36 + p2p-collector/README.md | 103 +++ p2p-collector/alert.py | 80 ++ p2p-collector/binance_client.py | 144 ++++ p2p-collector/collect_p2p.py | 176 ++++ p2p-collector/config.yaml | 32 + p2p-collector/normalizer.py | 99 +++ p2p-collector/requirements.txt | 4 + .../sample_responses/response_buy.json | 44 + .../sample_responses/response_sell.json | 44 + p2p-collector/scheduler.py | 240 ++++++ p2p-collector/storage.py | 102 +++ p2p-collector/tests/test_normalizer.py | 107 +++ p2p-collector/tests/test_storage.py | 72 ++ p2p-collector/tests/test_validator.py | 97 +++ p2p-collector/utils.py | 15 + p2p-collector/validator.py | 147 ++++ 19 files changed, 2313 insertions(+) create mode 100644 .gitignore create mode 100644 base_plan.md create mode 100644 p2p-collector/Makefile create mode 100644 p2p-collector/README.md create mode 100644 p2p-collector/alert.py create mode 100644 p2p-collector/binance_client.py create mode 100644 p2p-collector/collect_p2p.py create mode 100644 p2p-collector/config.yaml create mode 100644 p2p-collector/normalizer.py create mode 100644 p2p-collector/requirements.txt create mode 100644 p2p-collector/sample_responses/response_buy.json create mode 100644 p2p-collector/sample_responses/response_sell.json create mode 100644 p2p-collector/scheduler.py create mode 100644 p2p-collector/storage.py create mode 100644 p2p-collector/tests/test_normalizer.py create mode 100644 p2p-collector/tests/test_storage.py create mode 100644 p2p-collector/tests/test_validator.py create mode 100644 p2p-collector/utils.py create mode 100644 p2p-collector/validator.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8b66b65 --- /dev/null +++ b/.gitignore @@ -0,0 +1,11 @@ +__pycache__/ +*.py[cod] +*$py.class +.venv/ +venv/ +ENV/ +p2p-collector/data/ +*.log +*.tmp +*.alert +checkpoint.json diff --git a/base_plan.md b/base_plan.md new file mode 100644 index 0000000..2681774 --- /dev/null +++ b/base_plan.md @@ -0,0 +1,760 @@ +# Binance P2P Data Collection — Detailed Implementation Spec + +> **Purpose:** This document is the single source of truth for the data collection phase. Every field, every endpoint, every edge case is specified so a coder can implement without ambiguity. +> +> **Status:** Phase 1 — Data Collection only. No ML. No trading. No algorithm decisions yet. + +--- + +## 1. The Core Loop (Exact Pseudocode) + +``` +while True: + try: + buy_snap = fetch_all_ads(tradeType="BUY", asset="USDT", fiat="VES") + sell_snap = fetch_all_ads(tradeType="SELL", asset="USDT", fiat="VES") + + flat_buy = [normalize_ad(ad, "BUY", now_utc) for ad in buy_snap] + flat_sell = [normalize_ad(ad, "SELL", now_utc) for ad in sell_snap] + + validate_snapshot(flat_buy + flat_sell) + + store_parquet(flat_buy, base_path / "raw" / "buy_ads" / date_partition) + store_parquet(flat_sell, base_path / "raw" / "sell_ads" / date_partition) + + log_success(len(flat_buy), len(flat_sell), elapsed) + + except Exception as e: + log_error(e, consecutive_failures) + consecutive_failures += 1 + if consecutive_failures >= 5: + write_alert_file() # human needs to check + + sleep(jitter(interval_seconds)) # default 300s ± 10% +``` + +--- + +## 2. API Client — Exact Implementation + +### 2.1 Endpoint + +``` +POST https://p2p.binance.com/bapi/c2c/v2/friendly/c2c/adv/search +``` + +**No API key.** This is fully public. + +### 2.2 Headers + +| Header | Value | +|---|---| +| `Content-Type` | `application/json` | +| `User-Agent` | `Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36` | +| `Accept` | `*/*` | +| `Origin` | `https://p2p.binance.com` | +| `Referer` | `https://p2p.binance.com/` | + +### 2.3 Request Body (BUY example) + +```json +{ + "asset": "USDT", + "fiat": "VES", + "tradeType": "BUY", + "page": 1, + "rows": 20, + "payTypes": [], + "countries": [], + "publisherType": null, + "classify": "personal", + "filter": {} +} +``` + +**Key notes for the coder:** +- `tradeType: "BUY"` = advertiser wants to **give you VES** in exchange for your USDT. They are *buying* USDT from you. +- `tradeType: "SELL"` = advertiser wants to **give you USDT** in exchange for your VES. They are *selling* USDT to you. +- `payTypes: []` = no filter, return all payment methods +- `rows: 20` = Binance's max per page (do not change) +- `publisherType: null` = both merchants and regular users +- `classify: "personal"` = personal ads (not business) — covers the P2P marketplace + +### 2.4 Pagination Logic + +```python +def fetch_all_ads(trade_type, asset, fiat, max_pages=10): + all_ads = [] + + for page in range(1, max_pages + 1): + body = { + "asset": asset, + "fiat": fiat, + "tradeType": trade_type, + "page": page, + "rows": 20, + "payTypes": [], + "countries": [], + "publisherType": None, + "classify": "personal", + "filter": {} + } + + resp = httpx.post(URL, json=body, headers=HEADERS, timeout=15) + resp.raise_for_status() + + data = resp.json() + + if not data.get("success"): + raise APIError(f"API returned success=false: {data}") + + ads = data.get("data", []) + total = data.get("total", 0) + + all_ads.extend(ads) + + # Stop if we've collected all available ads + if len(all_ads) >= total: + break + + # Don't request a page that starts beyond total ads + if page * 20 >= total: + break + + if page < max_pages: + time.sleep(0.5) # 500ms between pages + + return all_ads +``` + +### 2.5 Rate Limiting — Defensive Strategy + +| Event | Wait time | Notes | +|---|---|---| +| Between pages (same snapshot) | 500 ms | Fixed | +| Between snapshots (BUY → SELL) | 1 second | Fixed | +| Between full cycles | 300 s ± 30s | Jittered to avoid clock sync | +| HTTP 429 (rate limited) | 60s → 120s → 240s → 480s | Exponential backoff, cap at 480s | +| Connection error | 30s retry | Transient network issues | +| 5xx server error | 60s retry | Binance server-side issues | + +**Important:** After a 429, reset the backoff after one successful full snapshot. + +### 2.6 Proxy Support (Optional — keep simple first) + +Start with **no proxy**, direct from VPS. Only add proxy rotation if we hit rate limits. Binance rarely rate-limits P2P at 1 request/5min. + +--- + +## 3. Normalization — Exact Field Mapping + +### 3.1 The Flattened Schema (one row = one ad) + +| # | Output field | Type | JSON path | Notes | +|---|---|---|---|---| +| 1 | `snapshot_id` | string | auto: `{fetch_ts_iso}_{trade_type}` | e.g. `"20260605T133000Z_BUY"` | +| 2 | `fetched_at` | datetime | auto: now_utc | Always UTC | +| 3 | `fetched_date` | string | auto: YYYY-MM-DD | Partition column | +| 4 | `trade_type` | string | `adv.tradeType` | "BUY" or "SELL" | +| 5 | `adv_no` | string | `adv.advNo` | Unique ad ID | +| 6 | `asset` | string | `adv.asset` | "USDT" | +| 7 | `fiat` | string | `adv.fiatUnit` | "VES" | +| 8 | `price` | float | `adv.price` | Parse as float | +| 9 | `surplus_amount` | float | `adv.surplusAmount` | Remaining USDT | +| 10 | `min_amount` | float | `adv.minSingleTransAmount` | Min USDT per trade | +| 11 | `max_amount` | float | `adv.maxSingleTransAmount` | Max USDT per trade | +| 12 | `tradable_quantity` | float | `adv.tradableQuantity` | Same as surplus? | +| 13 | `advertiser_no` | string | `advertiser.userNo` | **Stable ID** — use this | +| 14 | `advertiser_name` | string | `advertiser.nickName` | For reference only | +| 15 | `advertiser_type` | string | `advertiser.userType` | "merchant" or "user" | +| 16 | `month_order_count` | int | `advertiser.monthOrderCount` | | +| 17 | `month_finish_rate` | float | `advertiser.monthFinishRate` | 0.0 to 1.0 | +| 18 | `positive_rate` | float | `advertiser.positiveRate` | 0.0 to 1.0 | +| 19 | `user_positive_rate` | float | `advertiser.userPositiveRate` | older field, same idea | +| 20 | `payment_methods` | list[str] | `adv.tradeMethods[].payType` | e.g. `["BANESCO", "PAGO_MOVIL"]` | +| 21 | `payment_method_ids` | list[str] | `adv.tradeMethods[].identifier` | e.g. `["Banco_Banesco", "Pago_Movil"]` | +| 22 | `ad_created_at` | datetime | `adv.createTime` | Unix millisecond → datetime | +| 23 | `price_type` | string | `adv.priceType` | Usually "FIXED" | + +### 3.2 JSON Path Details (nested structure) + +The API response has this structure: + +```json +{ + "data": [ + { + "adv": { + "advNo": "6f8b2e...", + "tradeType": "BUY", + "asset": "USDT", + "fiatUnit": "VES", + "price": "58.50", + "surplusAmount": "1520.43", + "maxSingleTransAmount": "5000.00", + "minSingleTransAmount": "100.00", + "tradableQuantity": "1520.43", + "createTime": 1749128400000, + "fiatSymbol": "Bs", + "priceType": "FIXED", + "tradeMethods": [ + { + "identifier": "Banco_Banesco", + "payType": "BANESCO", + "payMethodId": "BANESCO" + }, + { + "identifier": "Pago_Movil", + "payType": "PAGO_MOVIL", + "payMethodId": "PAGO_MOVIL" + } + ] + }, + "advertiser": { + "userNo": "ABC123", + "nickName": "CryptoTraderVE", + "userType": "merchant", + "monthOrderCount": 342, + "monthFinishRate": 0.97, + "positiveRate": 0.99, + "userPositiveRate": 0.99 + } + } + ], + "total": 156, + "pageSize": 20, + "success": true +} +``` + +### 3.3 Normalization Code Sketch + +```python +def normalize_ad(raw_ad: dict, trade_type: str, fetched_at: datetime) -> dict: + adv = raw_ad["adv"] + adver = raw_ad["advertiser"] + + payment_methods = [m["payType"] for m in adv.get("tradeMethods", [])] + payment_method_ids = [m["identifier"] for m in adv.get("tradeMethods", [])] + + return { + "snapshot_id": f"{fetched_at.strftime('%Y%m%dT%H%M%SZ')}_{trade_type}", + "fetched_at": fetched_at, + "fetched_date": fetched_at.strftime("%Y-%m-%d"), + "trade_type": trade_type, + "adv_no": adv["advNo"], + "asset": adv["asset"], + "fiat": adv["fiatUnit"], + "price": float(adv["price"]), + "surplus_amount": float(adv.get("surplusAmount", 0)), + "min_amount": float(adv.get("minSingleTransAmount", 0)), + "max_amount": float(adv.get("maxSingleTransAmount", 0)), + "tradable_quantity": float(adv.get("tradableQuantity", 0)), + "advertiser_no": adver["userNo"], + "advertiser_name": adver["nickName"], + "advertiser_type": adver.get("userType", "user"), + "month_order_count": adver.get("monthOrderCount", 0), + "month_finish_rate": float(adver.get("monthFinishRate", 0)), + "positive_rate": float(adver.get("positiveRate", 0)), + "user_positive_rate": float(adver.get("userPositiveRate", 0)), + "payment_methods": payment_methods, # e.g. ["BANESCO", "PAGO_MOVIL"] + "payment_method_ids": payment_method_ids, # e.g. ["Banco_Banesco", "Pago_Movil"] + "ad_created_at": datetime.fromtimestamp( + adv["createTime"] / 1000, tz=timezone.utc + ), + "price_type": adv.get("priceType", "FIXED"), + } +``` + +--- + +## 4. Payment Methods — The Critical Column + +### 4.1 Known Payment Method Identifiers for Venezuela + +| `payType` value | `identifier` value | Common name | +|---|---|---| +| `BANESCO` | `Banco_Banesco` | Banesco bank transfer | +| `MERCANTIL` | `Banco_Mercantil` | Mercantil bank transfer | +| `PROVINCIAL` | `Banco_Provincial` | Banco Provincial (BBVA) | +| `VENEZUELA` | `Banco_De_Venezuela` | Banco de Venezuela (BDV) | +| `BANCO_NACIONAL_CREDITO` | `Banco_Nacional_De_Credito` | BNC | +| `SOFITASA` | `Sofitasa` | Sofitasa | +| `BANCAMIGA` | `Bancamiga` | Bancamiga | +| `BANCO_EXTERIOR` | `Banco_Exterior` | Banco Exterior | +| `BANCO_OCCIDENTE` | `Banco_Occidente` | Banco Occidental de Descuento (BOD) | +| `BANCO_PLATA` | `Banco_Plata` | Banco Plaza | +| `BANESCO_PERSONAL` | `Banesco_Personal` | Banesco personal account | +| `PAGO_MOVIL` | `Pago_Movil` | Mobile payment (inter-bank) | +| `BANCANET` | `Bancanet` | Bancanet | +| `BANPLUS` | `Banplus` | Banplus | +| `ZELLE` | `Zelle` | Zelle (USD, not VES) | +| `PAYPAL` | `Paypal` | PayPal (USD) | +| `CASH_VEF` | `Efectivo_VEF` | Cash in VES | +| `CASH_USD` | `Efectivo_USD` | Cash in USD | +| `PAGO_MOVIL` | `Pago_Movil_Banco_Venezuela` | Mobile payment at specific bank | + +### 4.2 Why This Matters for Bank Arbitrage + +```python +# Example analysis query after ~1 week of data: +# For each snapshot, find the best path: +# +# Best BUY price (sell USDT → get VES): Banesco, 60.50 VES/USDT +# Best SELL price (buy USDT → give VES): Mercantil, 62.30 VES/USDT +# Gross arbitrage: 62.30 - 60.50 = 1.80 VES/USDT = ~2.9% spread +# +# If same bank: you lose 0% on internal transfer +# If different banks: you lose bank transfer fee (maybe 0.5%) +# Net profit = 2.9% - 0.5% = 2.4% per round trip +``` + +### 4.3 Storage Consideration + +`payment_methods` is a **list of strings** — this is fine in Parquet (stored as a repeated field). For CSV it would need to be JSON-encoded or one-hot encoded later. + +--- + +## 5. Storage — Exact File Layout + +``` +/path/to/data/ +├── raw/ +│ ├── buy_ads/ +│ │ └── year=2026/ +│ │ └── month=06/ +│ │ └── day=05/ +│ │ ├── snapshot_20260605_133000.parquet +│ │ ├── snapshot_20260605_133500.parquet +│ │ └── ... +│ ├── sell_ads/ +│ │ └── year=2026/ +│ │ └── month=06/ +│ │ └── day=05/ +│ │ ├── snapshot_20260605_133000.parquet +│ │ └── ... +│ └── daily_merged/ <-- OPTIONAL: daily combined view +│ └── year=2026/ +│ └── month=06/ +│ └── 2026-06-05.parquet +│ +├── logs/ +│ └── collector_20260605.log +│ +├── alerts/ <-- alert marker files go here +│ └── (empty if no issues) +│ +└── checkpoint.json <-- for restart resilience +``` + +### 5.1 File Naming Convention + +**Snapshot files:** `snapshot_{YYYYMMDD}_{HHMMSS}.parquet` +- Time used: the start timestamp of the snapshot (UTC) +- Example: `snapshot_20260605_133000.parquet` + +**Why no UUIDs?** The timestamp + trade_type partition is already unique. No repeated names unless you run two collectors (don't). + +### 5.2 Atomic Writes (No Partial Files) + +```python +def store_parquet(rows, base_dir, fetched_at): + if not rows: + return + + # Build partition path from timestamp + year = fetched_at.strftime("%Y") + month = fetched_at.strftime("%m") + day = fetched_at.strftime("%d") + filename = f"snapshot_{fetched_at.strftime('%Y%m%d_%H%M%S')}.parquet" + + dest_dir = Path(base_dir) / f"year={year}" / f"month={month}" / f"day={day}" + dest_dir.mkdir(parents=True, exist_ok=True) + + # Write to temp file first + tmp_path = dest_dir / (filename + ".tmp") + final_path = dest_dir / filename + + df = pd.DataFrame(rows) + df.to_parquet(tmp_path, index=False, engine="pyarrow") + + # Atomic rename + tmp_path.rename(final_path) +``` + +### 5.3 Schema Consistency Check + +Each snapshot should write a schema marker file once: + +```python +# After first successful write per partition, write schema.parquet as a reference +schema_path = dest_dir / "_schema.parquet" +if not schema_path.exists(): + df.iloc[:0].to_parquet(schema_path) # empty DataFrame with same schema +``` + +This allows downstream readers to discover the schema without reading a full snapshot. + +--- + +## 6. Data Validation During Collection + +### 6.1 Row-Level Rejection Rules + +Reject (skip, don't crash) individual ads if: + +| Condition | Why | Action | +|---|---|---| +| `price` is None or ≤ 0 | Bad data | Log warning, skip | +| `surplusAmount` is None or ≤ 0 | Ad has no USDT left | Log debug, skip | +| `monthFinishRate` is 0.0 and `monthOrderCount` > 0 | Merchant hasn't completed any orders (suspicious) | Log warning, skip | +| `price` < 1.0 or `price` > 500.0 | Way outside VES/USDT normal range (should be ~50–150) | Log warning, skip this ad | +| Empty `advNo` | Missing identifier | Log error, skip | +| Duplicate `advNo` within same snapshot | Possible API glitch | Log warning, keep first occurrence | + +### 6.2 Snapshot-Level Validation + +After collecting all ads for one snapshot: + +``` +✅ TOTAL ADS: BUY=47 SELL=53 (should be 20-200 each) +✅ PRICE RANGE: BUY [54.20 - 62.80] SELL [58.00 - 68.50] + (SELL should be consistently higher than BUY) + If not: LOG WARNING "BUY/SELL overlap detected" +✅ SPREAD: SELL_min - BUY_max = 58.00 - 62.80 = -4.80 + (If negative: spread is inverted — unusual but possible) + Log: "Current spread: {spread:.2f} VES/USDT" +✅ MEDIAN PRICE: BUY=58.30 SELL=63.50 +✅ AD STALENESS: 0 ads with createTime > 7 days old + (If any: they're stale, still keep them, but log it) +✅ EMPTY SNAPSHOT: If BUY=0 AND SELL=0 → CRITICAL ALERT +``` + +### 6.3 Snapshot Summary Log Line (one line per snapshot) + +``` +2026-06-05 13:30:00 UTC | BUY=47 ads [54.20–62.80] SELL=53 ads [58.00–68.50] | spread= -4.80 | took 3.2s | methods=[BANESCO,PAGO_MOVIL,MERCANTIL,...] +``` + +--- + +## 7. Scheduling & Lifecycle + +### 7.1 Startup Behavior + +``` +1. Read checkpoint.json (if exists) + → "last_completed_snapshot": "2026-06-05T13:25:00Z" + → Wait until (last_completed + interval) before starting + → If checkpoint is missing or corrupted, start immediately + +2. Verify data directory is writable + → Try writing a test file, then delete it + +3. Log: "Starting collector. Interval=300s. Pairs=USDT/VES" +``` + +### 7.2 Graceful Shutdown + +```python +import signal + +running = True + +def handle_signal(sig, frame): + global running + logging.info("Received signal %s, finishing current snapshot...", sig) + running = False + +signal.signal(signal.SIGINT, handle_signal) +signal.signal(signal.SIGTERM, handle_signal) + +# In main loop: +while running: + # ... do snapshot ... + # Write checkpoint after each successful snapshot + write_checkpoint({"last_completed_snapshot": now_utc.isoformat()}) +``` + +### 7.3 Checkpoint File Format + +```json +{ + "last_completed_snapshot": "2026-06-05T13:30:00Z", + "last_buy_ad_count": 47, + "last_sell_ad_count": 53, + "consecutive_failures": 0, + "total_snapshots": 284, + "first_snapshot": "2026-06-01T00:00:00Z", + "version": "1.0" +} +``` + +### 7.4 Alert Marker File + +After 5 consecutive failures, write: + +``` +/path/to/data/alerts/20260605_133000_5_failures.alert +``` + +Content: +```json +{ + "timestamp": "2026-06-05T13:30:00Z", + "error": "HTTP 500 after 3 retries", + "consecutive_failures": 5, + "traceback": "..." +} +``` + +--- + +## 8. First-Run Verification Protocol + +After the collector writes its **first snapshot**, the coder should manually verify: + +### Step 1: Read the Parquet file back + +```python +import pandas as pd +df = pd.read_parquet("data/raw/buy_ads/year=2026/month=06/day=05/snapshot_20260605_133000.parquet") +df.info() +df.head() +``` + +Check: +- [ ] All columns present (23 columns from spec) +- [ ] No null values in critical fields (price, adv_no, advertiser_no) +- [ ] `price` is float type, not string +- [ ] `fetched_at` is datetime type +- [ ] `payment_methods` is a proper list column + +### Step 2: Verify BUY vs SELL logic + +```python +buy_ads = df[df["trade_type"] == "BUY"] +sell_ads = df[df["trade_type"] == "SELL"] + +print(f"BUY ads count: {len(buy_ads)}") +print(f"SELL ads count: {len(sell_ads)}") +print(f"BUY price range: {buy_ads['price'].min():.2f} - {buy_ads['price'].max():.2f}") +print(f"SELL price range: {sell_ads['price'].min():.2f} - {sell_ads['price'].max():.2f}") +``` + +Expected: SELL prices are higher than BUY prices (advertiser selling USDT charges a premium vs. buying USDT). + +### Step 3: Verify payment methods are captured + +```python +all_methods = set() +for methods in df["payment_methods"]: + all_methods.update(methods) +print(f"Payment methods found: {sorted(all_methods)}") +``` + +Expected: At least BANESCO and PAGO_MOVIL will appear. Possibly 5–15 different banks. + +### Step 4: Verify advertiser diversity + +```python +print(f"Unique advertisers: {df['advertiser_no'].nunique()}") +print(f"Merchants: {(df['advertiser_type'] == 'merchant').sum()}") +print(f"Users: {(df['advertiser_type'] == 'user').sum()}") +``` + +### Step 5: Run the collector for 1 hour (~12 snapshots) and verify: + +```bash +ls data/raw/buy_ads/year=2026/month=06/day=05/ | wc -l +# Should be ~12 +``` + +- [ ] No duplicate timestamps +- [ ] No gaps > 6 minutes +- [ ] No crash/restart in the logs + +--- + +## 9. File & Module Structure (Exact) + +``` +p2p-collector/ +├── collect_p2p.py # Entry point: argument parsing, main loop +├── config.yaml # All configurable settings +├── binance_client.py # fetch_all_ads(), pagination, rate limiting +├── normalizer.py # normalize_ad(), flatten schema +├── storage.py # store_parquet(), atomic writes, checkpoint +├── validator.py # validate_row(), validate_snapshot() +├── scheduler.py # main loop, sleep/jitter, signal handling +├── alert.py # write_alert_file(), logging setup +├── utils.py # jitter(), datetime helpers +├── requirements.txt # pinned versions +├── Makefile # setup, run, clean, test commands +├── tests/ +│ ├── test_normalizer.py # Test with sample API response +│ ├── test_storage.py # Test atomic writes +│ └── test_validator.py # Test rejection rules +├── sample_responses/ +│ ├── response_buy.json # One real-ish API response for tests +│ └── response_sell.json +└── README.md # Run instructions +``` + +### requirements.txt + +``` +httpx>=0.27,<1.0 +pandas>=2.0,<3.0 +pyarrow>=14.0,<16.0 +pyyaml>=6.0,<7.0 +``` + +Note: `httpx` over `requests` because it has native timeout support, cleaner API. Fall back to `requests` if the coder prefers. + +--- + +## 10. `config.yaml` — Complete Reference + +```yaml +binance: + base_url: "https://p2p.binance.com/bapi/c2c/v2/friendly/c2c/adv/search" + user_agent: "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" + timeout_seconds: 15 + max_pages: 10 + request_delay_seconds: 0.5 + +collection: + pairs: + - asset: "USDT" + fiat: "VES" + interval_seconds: 300 + output_dir: "./data/raw" + retry_attempts: 3 + retry_delay_base_seconds: 10 + +validation: + price_min: 1.0 + price_max: 500.0 + reject_zero_finish_rate: true + reject_zero_surplus: true + +logging: + level: "INFO" + file: "./data/logs/collector.log" + max_bytes: 10485760 # 10 MB + backup_count: 5 + format: "%(asctime)s | %(levelname)s | %(message)s" + +alerts: + consecutive_failure_threshold: 5 + alert_dir: "./data/alerts" +``` + +--- + +## 11. Run Modes + +### Mode 1: One-shot test + +```bash +python collect_p2p.py --once +``` + +- Fetches one BUY snapshot + one SELL snapshot +- Writes to disk +- Prints summary +- Exits +- Used for: first run, testing, debugging + +### Mode 2: Daemon (continuous) + +```bash +python collect_p2p.py +``` + +- Runs forever +- Loop with interval +- Graceful shutdown on Ctrl+C + +### Mode 3: Backfill (future) + +```bash +python collect_p2p.py --backfill --start=2026-06-01 --end=2026-06-03 +``` + +- Not needed now +- Architecture supports it later + +### Mode 4: Validate-only + +```bash +python collect_p2p.py --validate data/raw/buy_ads/year=2026/month=06/day=05/ +``` + +- Reads Parquet files +- Runs validation checks +- Prints report +- No API calls + +--- + +## 12. Testing the Coder's Work + +Hand this checklist to the coder when they say "it's done": + +| # | Test | How | +|---|---|---| +| 1 | **API connectivity** | `python collect_p2p.py --once` returns ads without error | +| 2 | **Pagination works** | Inspect: total ads fetched vs `total` field from API | +| 3 | **Both BUY and SELL** | Both directories have at least one file after `--once` | +| 4 | **Schema correct** | `pd.read_parquet(file)` → 23 columns, correct dtypes | +| 5 | **Payment methods populated** | At least 3 payment methods in the first snapshot | +| 6 | **Atomic write** | Kill the process mid-write (SIGKILL), no partial files remain. Only `.tmp` files | +| 7 | **Graceful shutdown** | Ctrl+C during a snapshot → clean exit, last snapshot saved | +| 8 | **Restart resilience** | Start collector, kill it, restart → resumes without duplicate timestamps | +| 9 | **Rate limiting** | No HTTP 429 in logs after 1 hour of continuous running | +| 10 | **Storage efficiency** | 1 hour of data ≤ 3 MB total on disk | + +--- + +## 13. Post-Collection — What the Data Will Look Like After One Week + +| Metric | Expected value | +|---|---| +| Snapshots collected | ~2,016 (7 days × 288 snapshots/day) | +| Total raw ads | ~200,000–400,000 rows | +| Storage used | ~20–100 MB | +| Unique advertisers | 100–500 | +| Unique payment methods | 10–20 | +| Price range (BUY) | ~55–65 VES/USDT (fluctuates with parallel dollar) | +| Price range (SELL) | ~58–70 VES/USDT | +| Typical spread | ~2–6 VES/USDT (3–10%) | + +**After 1 week of collection, we stop and do EDA before any ML decisions.** + +--- + +## 14. Known Gotchas / FAQ for the Coder + +**Q: What if the API returns different fields than documented?** +A: The normalizer should use `.get()` with defaults for every field. Log a warning if a field is missing that we expected. Don't crash. + +**Q: What if `tradeMethods` is empty?** +A: Some ads have no payment methods listed. Store as empty list `[]`. Continue. This is valid data. + +**Q: What timezone should I use?** +A: **Everything in UTC.** The user is in VET (UTC-4), but all stored timestamps are UTC. Timezone conversion is only for display. + +**Q: What if the VPS reboots?** +A: systemd `Restart=always` handles this. The collector reads the last checkpoint and continues after the appropriate delay. + +**Q: Should I use asyncio?** +A: No. Simple synchronous code. The delay between requests (5 minutes) means async provides zero benefit and adds complexity. + +**Q: Can I use SQLite instead of Parquet?** +A: You could, but Parquet is more storage-efficient and directly loadable into ML frameworks (Pandas, Polars, PyTorch). Stick with Parquet. + +--- + +*End of data collection spec. Hand this to the coding agent as the single source of truth.* diff --git a/p2p-collector/Makefile b/p2p-collector/Makefile new file mode 100644 index 0000000..cdb293a --- /dev/null +++ b/p2p-collector/Makefile @@ -0,0 +1,36 @@ +.PHONY: setup test run run-once clean validate + +VENV = .venv +PYTHON = $(VENV)/bin/python3 +PIP = $(VENV)/bin/pip + +setup: $(VENV)/bin/activate + +$(VENV)/bin/activate: requirements.txt + python3 -m venv $(VENV) + $(PIP) install --upgrade pip + $(PIP) install -r requirements.txt + touch $(VENV)/bin/activate + +test: setup + $(PYTHON) -m unittest discover -s tests -p "test_*.py" + +run: setup + $(PYTHON) collect_p2p.py + +run-once: setup + $(PYTHON) collect_p2p.py --once + +validate: setup + @if [ -z "$(PATH_TO_VALIDATE)" ]; then \ + echo "Usage: make validate PATH_TO_VALIDATE="; \ + exit 1; \ + fi + $(PYTHON) collect_p2p.py --validate $(PATH_TO_VALIDATE) + +clean: + rm -rf $(VENV) + find . -type f -name "*.pyc" -delete + find . -type d -name "__pycache__" -exec rm -rf {} + + find . -type f -name "*.tmp" -delete + @echo "Cleanup complete." diff --git a/p2p-collector/README.md b/p2p-collector/README.md new file mode 100644 index 0000000..9654526 --- /dev/null +++ b/p2p-collector/README.md @@ -0,0 +1,103 @@ +# Binance P2P Data Collector + +This tool continuously collects public peer-to-peer (P2P) market advertisements from Binance P2P for Venezuela (VES/USDT), normalizing, validating, and saving them as atomic date-partitioned Parquet files for subsequent exploratory data analysis and arbitrage modeling. + +## Project Structure + +``` +p2p-collector/ +├── collect_p2p.py # Entry point: argument parsing, validation/daemon modes +├── config.yaml # Application configuration (endpoints, delays, validation limits) +├── binance_client.py # HTTP client, pagination logic, retry, and 429 backoff +├── normalizer.py # Converts raw nested API responses into a flat 23-column schema +├── validator.py # Row-level filtering and snapshot-level integrity checks +├── storage.py # Atomic Parquet writes, schema references, and checkpoints +├── scheduler.py # Loop executor, initial start offsets, signal handling +├── alert.py # Write alert marker files on 5 consecutive failures & logger setup +├── utils.py # Time and sleep/jitter helpers +├── requirements.txt # Package dependencies (httpx, pandas, pyarrow, pyyaml) +├── Makefile # Automation targets (setup, test, run, clean) +└── tests/ # Suite of unit tests for all components +``` + +## Prerequisites + +- **Python 3.8+** (Developed and tested with Python 3.14) +- **Make** (utility for running Makefile targets) + +## Installation & Setup + +Set up the Python virtual environment and install all dependencies: + +```bash +make setup +``` + +## Running the Collector + +### Mode 1: Continuous Daemon Mode +Runs indefinitely, fetching snapshots according to the configured interval (default: 5 minutes) with a ±10% sleep jitter to prevent pattern recognition. Handles graceful shutdown on SIGINT/SIGTERM. + +```bash +make run +``` + +### Mode 2: One-shot Mode (Test/Debug) +Runs exactly one cycle (one BUY snapshot and one SELL snapshot), writes the results to disk, and exits immediately: + +```bash +make run-once +``` + +### Mode 3: Validate-Only Mode +Validates existing Parquet files without making any network calls. It prints statistics (row count, min/max prices, payment methods) and checks for critical schema issues: + +```bash +make validate PATH_TO_VALIDATE=data/raw/buy_ads/year=2026/month=06/day=05/ +``` + +## Running Tests + +Run the test suite to verify the client, normalizer, storage, and validation behaviors: + +```bash +make test +``` + +## Output Directory Structure + +The data is saved under `./data/` folder inside the project root: + +``` +data/ +├── raw/ +│ ├── buy_ads/ +│ │ └── year=YYYY/month=MM/day=DD/ +│ │ ├── _schema.parquet # Empty schema reference +│ │ └── snapshot_YYYYMMDD_HHMMSS.parquet # Atomic snapshot data +│ └── sell_ads/ +│ └── year=YYYY/month=MM/day=DD/ +│ ├── _schema.parquet +│ └── snapshot_YYYYMMDD_HHMMSS.parquet +├── logs/ +│ └── collector.log # Rotating logs +├── alerts/ +│ └── YYYYMMDD_HHMMSS_5_failures.alert # Alert marker JSON file (only on failures) +└── checkpoint.json # Restart resilience marker +``` + +## Checkpoint Format + +A checkpoint file is updated on every successful snapshot, ensuring that restarting the daemon will not query the API until the expected interval has passed: + +```json +{ + "last_completed_snapshot": "2026-06-05T13:30:00Z", + "last_buy_ad_count": 47, + "last_sell_ad_count": 53, + "consecutive_failures": 0, + "total_snapshots": 284, + "first_snapshot": "2026-06-01T00:00:00Z", + "version": "1.0" +} +``` diff --git a/p2p-collector/alert.py b/p2p-collector/alert.py new file mode 100644 index 0000000..69d2edc --- /dev/null +++ b/p2p-collector/alert.py @@ -0,0 +1,80 @@ +import os +import json +import logging +from logging.handlers import RotatingFileHandler +from datetime import datetime + +logger = logging.getLogger(__name__) + +def setup_logging(config: dict) -> None: + """ + Configures the logger using parameters from the config dictionary. + Sets up both a console handler and a rotating file handler. + """ + log_config = config.get("logging", {}) + level_str = log_config.get("level", "INFO") + level = getattr(logging, level_str.upper(), logging.INFO) + + log_file = log_config.get("file", "./data/logs/collector.log") + max_bytes = log_config.get("max_bytes", 10485760) + backup_count = log_config.get("backup_count", 5) + log_format = log_config.get("format", "%(asctime)s | %(levelname)s | %(message)s") + + # Ensure log directory exists + log_dir = os.path.dirname(log_file) + if log_dir: + os.makedirs(log_dir, exist_ok=True) + + # Set up root logger + root_logger = logging.getLogger() + root_logger.setLevel(level) + + # Remove existing handlers to avoid duplicates on re-setup + for handler in list(root_logger.handlers): + root_logger.removeHandler(handler) + + # Console handler + console_handler = logging.StreamHandler() + console_handler.setFormatter(logging.Formatter(log_format)) + root_logger.addHandler(console_handler) + + # Rotating file handler + try: + file_handler = RotatingFileHandler( + log_file, maxBytes=max_bytes, backupCount=backup_count, encoding="utf-8" + ) + file_handler.setFormatter(logging.Formatter(log_format)) + root_logger.addHandler(file_handler) + logger.info(f"Logging configured successfully. Writing to {log_file}") + except Exception as e: + logger.error(f"Failed to set up file logging: {e}") + + +def write_alert_file(config: dict, timestamp: datetime, error_msg: str, consecutive_failures: int, traceback_str: str) -> str: + """ + Writes an alert marker JSON file to the configured alert directory. + Returns the path of the created alert file. + """ + alerts_config = config.get("alerts", {}) + alert_dir = alerts_config.get("alert_dir", "./data/alerts") + os.makedirs(alert_dir, exist_ok=True) + + time_str = timestamp.strftime("%Y%m%d_%H%M%S") + filename = f"{time_str}_{consecutive_failures}_failures.alert" + alert_path = os.path.join(alert_dir, filename) + + content = { + "timestamp": timestamp.isoformat(), + "error": error_msg, + "consecutive_failures": consecutive_failures, + "traceback": traceback_str + } + + try: + with open(alert_path, "w", encoding="utf-8") as f: + json.dump(content, f, indent=4) + logger.error(f"ALERT WRITTEN: {alert_path}") + return alert_path + except Exception as e: + logger.critical(f"Failed to write alert file at {alert_path}: {e}") + return "" diff --git a/p2p-collector/binance_client.py b/p2p-collector/binance_client.py new file mode 100644 index 0000000..04aeb8f --- /dev/null +++ b/p2p-collector/binance_client.py @@ -0,0 +1,144 @@ +import time +import logging +import httpx + +logger = logging.getLogger(__name__) + +class BinanceP2PError(Exception): + """Base exception for Binance P2P client operations.""" + pass + +class RateLimitError(BinanceP2PError): + """Raised when Binance P2P API returns HTTP 429 (Rate Limited).""" + pass + +class APIError(BinanceP2PError): + """Raised when the API returns a response with success=false or invalid structure.""" + pass + +class BinanceP2PClient: + def __init__(self, config: dict): + binance_cfg = config.get("binance", {}) + self.base_url = binance_cfg.get("base_url", "https://p2p.binance.com/bapi/c2c/v2/friendly/c2c/adv/search") + self.user_agent = binance_cfg.get("user_agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36") + self.timeout = binance_cfg.get("timeout_seconds", 15) + self.max_pages = binance_cfg.get("max_pages", 10) + self.page_delay = binance_cfg.get("request_delay_seconds", 0.5) + + self.headers = { + "Content-Type": "application/json", + "User-Agent": self.user_agent, + "Accept": "*/*", + "Origin": "https://p2p.binance.com", + "Referer": "https://p2p.binance.com/" + } + + # Retry config + collection_cfg = config.get("collection", {}) + self.max_retries = collection_cfg.get("retry_attempts", 3) + self.retry_delay_base = collection_cfg.get("retry_delay_base_seconds", 10) + + # 429 Rate Limiting State + self.current_429_backoff = 60 + self.had_429_this_cycle = False + + def reset_429_backoff(self): + """Resets the 429 backoff delay to its initial value (60s).""" + if self.current_429_backoff != 60: + logger.info("Resetting 429 rate limit backoff to 60s.") + self.current_429_backoff = 60 + + def double_429_backoff(self): + """Doubles the 429 backoff delay, capping at 480s.""" + self.had_429_this_cycle = True + logger.warning(f"Rate limited (429). Setting next backoff delay to {self.current_429_backoff}s.") + time.sleep(self.current_429_backoff) + self.current_429_backoff = min(self.current_429_backoff * 2, 480) + + def _post_request_with_retries(self, body: dict) -> dict: + """ + Executes a POST request to the Binance P2P API with retries for connection + and 5xx errors, and special handling for 429 Rate Limits. + """ + for attempt in range(1, self.max_retries + 1): + try: + # Use httpx.Client for synchronous calls + with httpx.Client(headers=self.headers, timeout=self.timeout) as client: + resp = client.post(self.base_url, json=body) + + # Handle 429 specifically + if resp.status_code == 429: + self.double_429_backoff() + raise RateLimitError("HTTP 429 Rate Limited by Binance.") + + # Handle 5xx server errors + if 500 <= resp.status_code < 600: + logger.warning(f"Binance P2P API returned HTTP {resp.status_code} (attempt {attempt}/{self.max_retries}). Retrying in 60s...") + time.sleep(60) + continue + + # Raise for other HTTP errors (4xx except 429) + resp.raise_for_status() + + # Parse JSON + data = resp.json() + if not data.get("success"): + raise APIError(f"API response success=false: {data}") + + return data + + except (httpx.ConnectError, httpx.ConnectTimeout, httpx.ReadTimeout) as e: + logger.warning(f"Connection error occurred: {e} (attempt {attempt}/{self.max_retries}). Retrying in 30s...") + if attempt < self.max_retries: + time.sleep(30) + else: + raise BinanceP2PError(f"Failed to connect after {self.max_retries} attempts: {e}") + except httpx.HTTPStatusError as e: + logger.error(f"HTTP Error {e.response.status_code}: {e.response.text}") + raise BinanceP2PError(f"HTTP Status Error: {e}") + + raise BinanceP2PError("Failed to fetch P2P ads after maximum retries.") + + def fetch_all_ads(self, trade_type: str, asset: str, fiat: str) -> list: + """ + Fetches all P2P advertisements for a given trade type, asset, and fiat, + handling pagination and page-level delays. + """ + all_ads = [] + + for page in range(1, self.max_pages + 1): + body = { + "asset": asset, + "fiat": fiat, + "tradeType": trade_type, + "page": page, + "rows": 20, + "payTypes": [], + "countries": [], + "publisherType": None, + "classify": "personal", + "filter": {} + } + + logger.info(f"Fetching {trade_type} page {page}/{self.max_pages} for {asset}/{fiat}...") + data = self._post_request_with_retries(body) + + ads = data.get("data", []) + total = data.get("total", 0) + + all_ads.extend(ads) + logger.info(f"Retrieved {len(ads)} ads. Total collected so far: {len(all_ads)}/{total}") + + # Stop if we've collected all available ads + if len(all_ads) >= total: + break + + # Don't request a page that starts beyond total ads + if page * 20 >= total: + break + + # Delay between pages + if page < self.max_pages: + time.sleep(self.page_delay) + + return all_ads diff --git a/p2p-collector/collect_p2p.py b/p2p-collector/collect_p2p.py new file mode 100644 index 0000000..6e8daab --- /dev/null +++ b/p2p-collector/collect_p2p.py @@ -0,0 +1,176 @@ +import os +import argparse +import sys +import logging +import yaml +from pathlib import Path +import pandas as pd + +from alert import setup_logging +from scheduler import P2PCollectorScheduler + +logger = logging.getLogger("collect_p2p") + +def load_config(config_path: str) -> dict: + """Loads the YAML configuration file.""" + if not os.path.exists(config_path): + print(f"Error: Config file not found at {config_path}", file=sys.stderr) + sys.exit(1) + try: + with open(config_path, "r", encoding="utf-8") as f: + return yaml.safe_load(f) + except Exception as e: + print(f"Error parsing config file {config_path}: {e}", file=sys.stderr) + sys.exit(1) + + +def validate_parquet_files(path_str: str): + """ + Validates existing Parquet files at the specified path (file or directory). + Prints a report of the contents and validation status. + """ + path = Path(path_str) + if not path.exists(): + print(f"Error: Path does not exist: {path_str}", file=sys.stderr) + sys.exit(1) + + files = [] + if path.is_file(): + if path.suffix == ".parquet": + files.append(path) + elif path.is_dir(): + files = list(path.glob("**/*.parquet")) + + if not files: + print(f"No Parquet files found at {path_str}") + return + + print(f"Validating {len(files)} Parquet file(s)...") + print("-" * 60) + + total_rows = 0 + total_buy = 0 + total_sell = 0 + critical_errors = [] + warnings = [] + + expected_columns = { + "snapshot_id", "fetched_at", "fetched_date", "trade_type", "adv_no", + "asset", "fiat", "price", "surplus_amount", "min_amount", "max_amount", + "tradable_quantity", "advertiser_no", "advertiser_name", "advertiser_type", + "month_order_count", "month_finish_rate", "positive_rate", "user_positive_rate", + "payment_methods", "payment_method_ids", "ad_created_at", "price_type" + } + + for f in sorted(files): + # Skip schema files unless they are specifically targeted + if f.name == "_schema.parquet" and len(files) > 1: + continue + + try: + df = pd.read_parquet(f) + rows = len(df) + total_rows += rows + + print(f"File: {f.name} ({rows} rows)") + + # Check columns + cols = set(df.columns) + missing_cols = expected_columns - cols + extra_cols = cols - expected_columns + if missing_cols: + critical_errors.append(f"{f.name}: Missing expected columns: {missing_cols}") + if extra_cols: + warnings.append(f"{f.name}: Has extra columns: {extra_cols}") + + # Analyze trade types + if "trade_type" in df.columns: + buy_cnt = (df["trade_type"] == "BUY").sum() + sell_cnt = (df["trade_type"] == "SELL").sum() + total_buy += buy_cnt + total_sell += sell_cnt + print(f" Trade types: BUY={buy_cnt}, SELL={sell_cnt}") + + # Check critical nulls + critical_fields = ["price", "adv_no", "advertiser_no"] + for col in critical_fields: + if col in df.columns: + null_cnt = df[col].isnull().sum() + if null_cnt > 0: + critical_errors.append(f"{f.name}: Column '{col}' has {null_cnt} null values") + + # Check types + if "price" in df.columns and not pd.api.types.is_float_dtype(df["price"]): + critical_errors.append(f"{f.name}: Column 'price' is not float type") + + # Prices summary + if "price" in df.columns and rows > 0: + print(f" Price range: [{df['price'].min():.2f} - {df['price'].max():.2f}]") + + # Payment methods + if "payment_methods" in df.columns and rows > 0: + methods = set() + # payment_methods could be list of lists/arrays + for item in df["payment_methods"]: + if isinstance(item, str): + methods.add(item) + elif hasattr(item, "__iter__"): + methods.update(item) + print(f" Payment methods ({len(methods)}): {sorted(list(methods))}") + + except Exception as e: + critical_errors.append(f"{f.name}: Failed to read/validate: {e}") + + print("=" * 60) + print("VALIDATION SUMMARY") + print("=" * 60) + print(f"Total files validated: {len(files)}") + print(f"Total rows: {total_rows}") + print(f"Total BUY ads: {total_buy}") + print(f"Total SELL ads: {total_sell}") + + print("\nWarnings:") + if warnings: + for w in warnings: + print(f" - {w}") + else: + print(" None") + + print("\nCritical Errors:") + if critical_errors: + for err in critical_errors: + print(f" - [FAIL] {err}") + sys.exit(1) + else: + print(" [PASS] No critical validation issues found!") + sys.exit(0) + + +def main(): + parser = argparse.ArgumentParser(description="Binance P2P Data Collector") + parser.add_argument("--config", default="config.yaml", help="Path to config.yaml file") + parser.add_argument("--once", action="store_true", help="Run a single collection cycle and exit") + parser.add_argument("--validate", help="Path to Parquet file or directory to validate") + + args = parser.parse_args() + + # If validate-only mode requested + if args.validate: + validate_parquet_files(args.validate) + return + + # Load configuration + config = load_config(args.config) + + # Setup logging + setup_logging(config) + + # Initialize scheduler + scheduler = P2PCollectorScheduler(config) + + # Run scheduler + scheduler.run(once=args.once) + + +if __name__ == "__main__": + main() diff --git a/p2p-collector/config.yaml b/p2p-collector/config.yaml new file mode 100644 index 0000000..00ba44d --- /dev/null +++ b/p2p-collector/config.yaml @@ -0,0 +1,32 @@ +binance: + base_url: "https://p2p.binance.com/bapi/c2c/v2/friendly/c2c/adv/search" + user_agent: "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" + timeout_seconds: 15 + max_pages: 10 + request_delay_seconds: 0.5 + +collection: + pairs: + - asset: "USDT" + fiat: "VES" + interval_seconds: 300 + output_dir: "./data/raw" + retry_attempts: 3 + retry_delay_base_seconds: 10 + +validation: + price_min: 1.0 + price_max: 2000.0 + reject_zero_finish_rate: true + reject_zero_surplus: true + +logging: + level: "INFO" + file: "./data/logs/collector.log" + max_bytes: 10485760 # 10 MB + backup_count: 5 + format: "%(asctime)s | %(levelname)s | %(message)s" + +alerts: + consecutive_failure_threshold: 5 + alert_dir: "./data/alerts" diff --git a/p2p-collector/normalizer.py b/p2p-collector/normalizer.py new file mode 100644 index 0000000..fceaa86 --- /dev/null +++ b/p2p-collector/normalizer.py @@ -0,0 +1,99 @@ +import logging +from datetime import datetime, timezone + +logger = logging.getLogger(__name__) + +def normalize_ad(raw_ad: dict, trade_type: str, fetched_at: datetime) -> dict: + """ + Normalizes a single P2P ad dictionary from the Binance API response + into a flattened dictionary schema matching the spec. + """ + adv = raw_ad.get("adv", {}) + adver = raw_ad.get("advertiser", {}) + + # Check if critical structures are missing + if not adv: + logger.warning("Ad structure 'adv' is missing in raw ad data.") + if not adver: + logger.warning("Advertiser structure 'advertiser' is missing in raw ad data.") + + # Extract payment methods + trade_methods = adv.get("tradeMethods") or [] + payment_methods = [] + payment_method_ids = [] + for m in trade_methods: + if isinstance(m, dict): + pay_type = m.get("payType") + identifier = m.get("identifier") + if pay_type: + payment_methods.append(pay_type) + if identifier: + payment_method_ids.append(identifier) + + # Safe float conversion helper + def safe_float(val, default=0.0, field_name=None): + if val is None: + return default + try: + return float(val) + except (ValueError, TypeError) as e: + if field_name: + logger.warning(f"Could not convert field '{field_name}' value {val!r} to float: {e}") + return default + + # Safe int conversion helper + def safe_int(val, default=0, field_name=None): + if val is None: + return default + try: + return int(val) + except (ValueError, TypeError) as e: + if field_name: + logger.warning(f"Could not convert field '{field_name}' value {val!r} to int: {e}") + return default + + # Convert createTime (milliseconds since epoch) to datetime + create_time_ms = adv.get("createTime") + if create_time_ms is not None: + try: + ad_created_at = datetime.fromtimestamp(safe_float(create_time_ms) / 1000.0, tz=timezone.utc) + except Exception as e: + logger.warning(f"Could not parse ad createTime {create_time_ms}: {e}") + ad_created_at = fetched_at + else: + ad_created_at = fetched_at + + # Check for missing expected fields to log warnings, but don't fail + required_keys = ["advNo", "asset", "fiatUnit", "price"] + for key in required_keys: + if key not in adv: + logger.warning(f"Expected key '{key}' not found in 'adv' structure of ad: {raw_ad}") + + if "userNo" not in adver: + logger.warning(f"Expected key 'userNo' not found in 'advertiser' structure of ad: {raw_ad}") + + return { + "snapshot_id": f"{fetched_at.strftime('%Y%m%dT%H%M%SZ')}_{trade_type}", + "fetched_at": fetched_at, + "fetched_date": fetched_at.strftime("%Y-%m-%d"), + "trade_type": trade_type, + "adv_no": adv.get("advNo", ""), + "asset": adv.get("asset", "USDT"), + "fiat": adv.get("fiatUnit", "VES"), + "price": safe_float(adv.get("price"), 0.0, "price"), + "surplus_amount": safe_float(adv.get("surplusAmount"), 0.0, "surplusAmount"), + "min_amount": safe_float(adv.get("minSingleTransAmount"), 0.0, "minSingleTransAmount"), + "max_amount": safe_float(adv.get("maxSingleTransAmount"), 0.0, "maxSingleTransAmount"), + "tradable_quantity": safe_float(adv.get("tradableQuantity"), 0.0, "tradableQuantity"), + "advertiser_no": adver.get("userNo", ""), + "advertiser_name": adver.get("nickName", ""), + "advertiser_type": adver.get("userType", "user"), + "month_order_count": safe_int(adver.get("monthOrderCount"), 0, "monthOrderCount"), + "month_finish_rate": safe_float(adver.get("monthFinishRate"), 0.0, "monthFinishRate"), + "positive_rate": safe_float(adver.get("positiveRate"), 0.0, "positiveRate"), + "user_positive_rate": safe_float(adver.get("userPositiveRate"), 0.0, "userPositiveRate"), + "payment_methods": payment_methods, + "payment_method_ids": payment_method_ids, + "ad_created_at": ad_created_at, + "price_type": adv.get("priceType", "FIXED"), + } diff --git a/p2p-collector/requirements.txt b/p2p-collector/requirements.txt new file mode 100644 index 0000000..ef17617 --- /dev/null +++ b/p2p-collector/requirements.txt @@ -0,0 +1,4 @@ +httpx>=0.27,<1.0 +pandas>=2.0,<3.0 +pyarrow>=14.0 +pyyaml>=6.0,<7.0 diff --git a/p2p-collector/sample_responses/response_buy.json b/p2p-collector/sample_responses/response_buy.json new file mode 100644 index 0000000..3eff38d --- /dev/null +++ b/p2p-collector/sample_responses/response_buy.json @@ -0,0 +1,44 @@ +{ + "data": [ + { + "adv": { + "advNo": "6f8b2e12345", + "tradeType": "BUY", + "asset": "USDT", + "fiatUnit": "VES", + "price": "58.50", + "surplusAmount": "1520.43", + "maxSingleTransAmount": "5000.00", + "minSingleTransAmount": "100.00", + "tradableQuantity": "1520.43", + "createTime": 1749128400000, + "fiatSymbol": "Bs", + "priceType": "FIXED", + "tradeMethods": [ + { + "identifier": "Banco_Banesco", + "payType": "BANESCO", + "payMethodId": "BANESCO" + }, + { + "identifier": "Pago_Movil", + "payType": "PAGO_MOVIL", + "payMethodId": "PAGO_MOVIL" + } + ] + }, + "advertiser": { + "userNo": "ABC123456", + "nickName": "CryptoTraderVE", + "userType": "merchant", + "monthOrderCount": 342, + "monthFinishRate": 0.97, + "positiveRate": 0.99, + "userPositiveRate": 0.99 + } + } + ], + "total": 1, + "pageSize": 20, + "success": true +} diff --git a/p2p-collector/sample_responses/response_sell.json b/p2p-collector/sample_responses/response_sell.json new file mode 100644 index 0000000..06b0540 --- /dev/null +++ b/p2p-collector/sample_responses/response_sell.json @@ -0,0 +1,44 @@ +{ + "data": [ + { + "adv": { + "advNo": "7a9c3d98765", + "tradeType": "SELL", + "asset": "USDT", + "fiatUnit": "VES", + "price": "62.30", + "surplusAmount": "2500.00", + "maxSingleTransAmount": "10000.00", + "minSingleTransAmount": "500.00", + "tradableQuantity": "2500.00", + "createTime": 1749129000000, + "fiatSymbol": "Bs", + "priceType": "FIXED", + "tradeMethods": [ + { + "identifier": "Banco_Mercantil", + "payType": "MERCANTIL", + "payMethodId": "MERCANTIL" + }, + { + "identifier": "Pago_Movil", + "payType": "PAGO_MOVIL", + "payMethodId": "PAGO_MOVIL" + } + ] + }, + "advertiser": { + "userNo": "XYZ789012", + "nickName": "MercantilSeller", + "userType": "merchant", + "monthOrderCount": 512, + "monthFinishRate": 0.99, + "positiveRate": 0.98, + "userPositiveRate": 0.98 + } + } + ], + "total": 1, + "pageSize": 20, + "success": true +} diff --git a/p2p-collector/scheduler.py b/p2p-collector/scheduler.py new file mode 100644 index 0000000..a78f43f --- /dev/null +++ b/p2p-collector/scheduler.py @@ -0,0 +1,240 @@ +import os +import time +import signal +import logging +import traceback +from datetime import datetime, timezone +from pathlib import Path + +from binance_client import BinanceP2PClient, BinanceP2PError +from normalizer import normalize_ad +from validator import validate_row, validate_snapshot +from storage import store_parquet, read_checkpoint, write_checkpoint +from alert import write_alert_file +from utils import jitter, now_utc + +logger = logging.getLogger(__name__) + +class P2PCollectorScheduler: + def __init__(self, config: dict): + self.config = config + self.running = True + + # Configure directories + collection_cfg = config.get("collection", {}) + self.output_dir = collection_cfg.get("output_dir", "./data/raw") + self.interval = collection_cfg.get("interval_seconds", 300) + self.pairs = collection_cfg.get("pairs", [{"asset": "USDT", "fiat": "VES"}]) + + # Checkpoint path: in the parent folder of output_dir (or same directory if raw) + # Spec says: + # data/ + # ├── raw/ + # └── checkpoint.json + # So we look at parent of output_dir + self.data_dir = str(Path(self.output_dir).parent) + self.checkpoint_path = os.path.join(self.data_dir, "checkpoint.json") + + self.client = BinanceP2PClient(config) + self.consecutive_failures = 0 + + # Signal handlers + signal.signal(signal.SIGINT, self._handle_signal) + signal.signal(signal.SIGTERM, self._handle_signal) + + def _handle_signal(self, sig, frame): + logger.info(f"Received signal {sig}. Initiating graceful shutdown after current snapshot completes...") + self.running = False + + def verify_directories(self): + """Verifies that the output directory is writable by writing a test file and deleting it.""" + os.makedirs(self.output_dir, exist_ok=True) + test_file = os.path.join(self.output_dir, ".write_test") + try: + with open(test_file, "w") as f: + f.write("test") + os.remove(test_file) + logger.info(f"Directory write verification passed for {self.output_dir}") + except Exception as e: + logger.critical(f"Directory verification failed on {self.output_dir}: {e}") + raise OSError(f"Output directory not writable: {e}") + + def get_initial_wait_seconds(self) -> float: + """Reads checkpoint to determine how long to wait before starting the loop.""" + checkpoint = read_checkpoint(self.checkpoint_path) + last_completed = checkpoint.get("last_completed_snapshot") + if not last_completed: + return 0.0 + + try: + last_dt = datetime.fromisoformat(last_completed) + # Make sure timezone aware UTC + if last_dt.tzinfo is None: + last_dt = last_dt.replace(tzinfo=timezone.utc) + + elapsed = (now_utc() - last_dt).total_seconds() + wait_time = self.interval - elapsed + if wait_time > 0: + logger.info(f"Resuming. Last snapshot completed {elapsed:.1f}s ago. Initial wait time: {wait_time:.1f}s.") + return wait_time + except Exception as e: + logger.warning(f"Error parsing last snapshot time from checkpoint: {e}. Starting immediately.") + + return 0.0 + + def run_single_cycle(self) -> dict: + """Runs a single snapshot collection cycle for all pairs.""" + cycle_start_time = now_utc() + cycle_stats = {} + + # We process each pair configured + for pair in self.pairs: + asset = pair.get("asset", "USDT") + fiat = pair.get("fiat", "VES") + + start_ts = time.time() + + # 1. Fetch raw advertisements + # Delay between trade types (snapshots) is 1 second + buy_raw = self.client.fetch_all_ads("BUY", asset, fiat) + time.sleep(1.0) + sell_raw = self.client.fetch_all_ads("SELL", asset, fiat) + + # 2. Normalize and Filter individual rows + seen_adv_nos = set() + flat_buy = [] + for ad in buy_raw: + norm = normalize_ad(ad, "BUY", cycle_start_time) + if validate_row(norm, self.config, seen_adv_nos): + flat_buy.append(norm) + + flat_sell = [] + for ad in sell_raw: + norm = normalize_ad(ad, "SELL", cycle_start_time) + if validate_row(norm, self.config, seen_adv_nos): + flat_sell.append(norm) + + # 3. Snapshot-level Validation + combined_ads = flat_buy + flat_sell + validation_summary = validate_snapshot(combined_ads, cycle_start_time) + + # 4. Storage (atomic writes to raw/buy_ads and raw/sell_ads) + buy_path = store_parquet(flat_buy, os.path.join(self.output_dir, "buy_ads"), cycle_start_time) + sell_path = store_parquet(flat_sell, os.path.join(self.output_dir, "sell_ads"), cycle_start_time) + + elapsed = time.time() - start_ts + + # 5. Snapshot Summary Log Line + # Format: 2026-06-05 13:30:00 UTC | BUY=47 ads [54.20–62.80] SELL=53 ads [58.00–68.50] | spread= -4.80 | took 3.2s | methods=[BANESCO,PAGO_MOVIL,MERCANTIL,...] + methods_str = ",".join(validation_summary["methods"]) + logger.info( + f"{cycle_start_time.strftime('%Y-%m-%d %H:%M:%S')} UTC | " + f"BUY={validation_summary['buy_count']} ads [{validation_summary['buy_min']:.2f}-{validation_summary['buy_max']:.2f}] " + f"SELL={validation_summary['sell_count']} ads [{validation_summary['sell_min']:.2f}-{validation_summary['sell_max']:.2f}] | " + f"spread={validation_summary['spread']:.2f} | took {elapsed:.1f}s | " + f"methods=[{methods_str}]" + ) + + cycle_stats[f"{asset}_{fiat}"] = { + "buy_count": validation_summary['buy_count'], + "sell_count": validation_summary['sell_count'], + "timestamp": cycle_start_time.isoformat() + } + + return cycle_stats + + def run(self, once: bool = False): + """Starts the main execution loop.""" + self.verify_directories() + + if once: + logger.info("Executing a single collection cycle (--once)...") + try: + self.run_single_cycle() + logger.info("One-shot collection complete. Exiting.") + except Exception as e: + logger.error(f"Error occurred during one-shot collection: {e}") + logger.error(traceback.format_exc()) + raise e + return + + # Continuous loop startup + initial_wait = self.get_initial_wait_seconds() + if initial_wait > 0 and self.running: + logger.info(f"Sleeping for initial delay of {initial_wait:.1f}s...") + # Sleep in small steps to remain responsive to signals + step = 1.0 + while initial_wait > 0 and self.running: + time.sleep(min(step, initial_wait)) + initial_wait -= step + + logger.info(f"Starting P2P data collector. Interval: {self.interval}s. Pairs: {self.pairs}") + + while self.running: + cycle_start = now_utc() + try: + stats = self.run_single_cycle() + + # Reset failure stats on success + self.consecutive_failures = 0 + self.client.reset_429_backoff() + + # Update checkpoint + checkpoint = read_checkpoint(self.checkpoint_path) + + # Get stats for the primary pair (USDT_VES or first pair) + primary_pair_key = f"{self.pairs[0]['asset']}_{self.pairs[0]['fiat']}" + pair_stats = stats.get(primary_pair_key, {}) + + # Update stats in checkpoint + total_snapshots = checkpoint.get("total_snapshots", 0) + 1 + first_snapshot = checkpoint.get("first_snapshot", cycle_start.isoformat()) + + checkpoint_data = { + "last_completed_snapshot": cycle_start.isoformat(), + "last_buy_ad_count": pair_stats.get("buy_count", 0), + "last_sell_ad_count": pair_stats.get("sell_count", 0), + "consecutive_failures": 0, + "total_snapshots": total_snapshots, + "first_snapshot": first_snapshot, + "version": "1.0" + } + write_checkpoint(self.checkpoint_path, checkpoint_data) + + except Exception as e: + self.consecutive_failures += 1 + tb_str = traceback.format_exc() + logger.error(f"Error during collection cycle (consecutive failures: {self.consecutive_failures}): {e}") + logger.error(tb_str) + + # Write alert file after threshold + alert_threshold = self.config.get("alerts", {}).get("consecutive_failure_threshold", 5) + if self.consecutive_failures >= alert_threshold: + try: + write_alert_file( + self.config, + cycle_start, + str(e), + self.consecutive_failures, + tb_str + ) + except Exception as alert_error: + logger.critical(f"Failed to write alert file: {alert_error}") + + # Update checkpoint with failure status + checkpoint = read_checkpoint(self.checkpoint_path) + checkpoint["consecutive_failures"] = self.consecutive_failures + write_checkpoint(self.checkpoint_path, checkpoint) + + # Determine sleep duration + if self.running: + sleep_sec = jitter(self.interval) + logger.info(f"Sleeping for {sleep_sec:.1f}s before next cycle...") + + # Sleep in small steps to handle signal termination cleanly + step = 1.0 + while sleep_sec > 0 and self.running: + time.sleep(min(step, sleep_sec)) + sleep_sec -= step + + logger.info("Collector has shut down gracefully.") diff --git a/p2p-collector/storage.py b/p2p-collector/storage.py new file mode 100644 index 0000000..86837aa --- /dev/null +++ b/p2p-collector/storage.py @@ -0,0 +1,102 @@ +import os +import json +import logging +from pathlib import Path +from datetime import datetime +import pandas as pd + +logger = logging.getLogger(__name__) + +def store_parquet(rows: list, base_dir: str, fetched_at: datetime) -> str: + """ + Stores rows of ads to a Parquet file atomically in a date-partitioned directory. + Also writes an empty _schema.parquet file for the schema reference if it doesn't exist. + Returns the path to the written final Parquet file. + """ + if not rows: + logger.warning("No rows provided to store_parquet.") + return "" + + year = fetched_at.strftime("%Y") + month = fetched_at.strftime("%m") + day = fetched_at.strftime("%d") + filename = f"snapshot_{fetched_at.strftime('%Y%m%d_%H%M%S')}.parquet" + + dest_dir = Path(base_dir) / f"year={year}" / f"month={month}" / f"day={day}" + dest_dir.mkdir(parents=True, exist_ok=True) + + tmp_path = dest_dir / (filename + ".tmp") + final_path = dest_dir / filename + + try: + df = pd.DataFrame(rows) + # Sort columns to ensure consistent schema layout + df = df.reindex(sorted(df.columns), axis=1) + + # Write atomically using a temporary file + df.to_parquet(tmp_path, index=False, engine="pyarrow") + tmp_path.rename(final_path) + logger.info(f"Successfully stored snapshot to {final_path}") + + # Schema consistency reference file + schema_path = dest_dir / "_schema.parquet" + if not schema_path.exists(): + try: + # Write an empty dataframe with identical columns and schema + df.iloc[:0].to_parquet(schema_path, index=False, engine="pyarrow") + logger.info(f"Created schema reference file at {schema_path}") + except Exception as e: + logger.error(f"Failed to create schema reference file: {e}") + + return str(final_path) + except Exception as e: + logger.error(f"Failed to write parquet file {final_path}: {e}") + # Clean up tmp file if it exists + if tmp_path.exists(): + try: + tmp_path.unlink() + except Exception as cleanup_error: + logger.error(f"Failed to delete temp file {tmp_path}: {cleanup_error}") + raise e + + +def read_checkpoint(path: str) -> dict: + """ + Reads the checkpoint JSON file if it exists. + Returns a dictionary, or an empty dict if the file is missing or corrupted. + """ + if not os.path.exists(path): + logger.info(f"Checkpoint file {path} not found. Starting fresh.") + return {} + try: + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + logger.info(f"Successfully read checkpoint from {path}") + return data + except Exception as e: + logger.warning(f"Checkpoint file {path} exists but is corrupted: {e}. Starting fresh.") + return {} + + +def write_checkpoint(path: str, data: dict) -> None: + """ + Writes the checkpoint dictionary to the specified JSON file path. + Uses atomic write to prevent corruption. + """ + dir_path = os.path.dirname(path) + if dir_path: + os.makedirs(dir_path, exist_ok=True) + + tmp_path = f"{path}.tmp" + try: + with open(tmp_path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=4) + os.replace(tmp_path, path) + logger.debug(f"Checkpoint updated at {path}") + except Exception as e: + logger.error(f"Failed to write checkpoint to {path}: {e}") + if os.path.exists(tmp_path): + try: + os.remove(tmp_path) + except Exception: + pass diff --git a/p2p-collector/tests/test_normalizer.py b/p2p-collector/tests/test_normalizer.py new file mode 100644 index 0000000..2a6b37a --- /dev/null +++ b/p2p-collector/tests/test_normalizer.py @@ -0,0 +1,107 @@ +import unittest +import json +import os +from datetime import datetime, timezone + +# Add the parent folder to path to import normalizer +import sys +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from normalizer import normalize_ad + +class TestNormalizer(unittest.TestCase): + def setUp(self): + # Paths to sample responses + current_dir = os.path.dirname(os.path.abspath(__file__)) + self.buy_json_path = os.path.join(current_dir, "..", "sample_responses", "response_buy.json") + self.sell_json_path = os.path.join(current_dir, "..", "sample_responses", "response_sell.json") + + def test_normalize_buy_ad(self): + with open(self.buy_json_path, "r", encoding="utf-8") as f: + data = json.load(f) + + raw_ad = data["data"][0] + fetched_at = datetime(2026, 6, 5, 13, 30, 0, tzinfo=timezone.utc) + + normalized = normalize_ad(raw_ad, "BUY", fetched_at) + + # Verify schema keys + expected_keys = { + "snapshot_id", "fetched_at", "fetched_date", "trade_type", "adv_no", + "asset", "fiat", "price", "surplus_amount", "min_amount", "max_amount", + "tradable_quantity", "advertiser_no", "advertiser_name", "advertiser_type", + "month_order_count", "month_finish_rate", "positive_rate", "user_positive_rate", + "payment_methods", "payment_method_ids", "ad_created_at", "price_type" + } + self.assertEqual(set(normalized.keys()), expected_keys) + + # Verify content mapping + self.assertEqual(normalized["snapshot_id"], "20260605T133000Z_BUY") + self.assertEqual(normalized["fetched_at"], fetched_at) + self.assertEqual(normalized["fetched_date"], "2026-06-05") + self.assertEqual(normalized["trade_type"], "BUY") + self.assertEqual(normalized["adv_no"], "6f8b2e12345") + self.assertEqual(normalized["asset"], "USDT") + self.assertEqual(normalized["fiat"], "VES") + self.assertEqual(normalized["price"], 58.50) + self.assertEqual(normalized["surplus_amount"], 1520.43) + self.assertEqual(normalized["min_amount"], 100.0) + self.assertEqual(normalized["max_amount"], 5000.0) + self.assertEqual(normalized["tradable_quantity"], 1520.43) + self.assertEqual(normalized["advertiser_no"], "ABC123456") + self.assertEqual(normalized["advertiser_name"], "CryptoTraderVE") + self.assertEqual(normalized["advertiser_type"], "merchant") + self.assertEqual(normalized["month_order_count"], 342) + self.assertEqual(normalized["month_finish_rate"], 0.97) + self.assertEqual(normalized["positive_rate"], 0.99) + self.assertEqual(normalized["user_positive_rate"], 0.99) + self.assertEqual(normalized["payment_methods"], ["BANESCO", "PAGO_MOVIL"]) + self.assertEqual(normalized["payment_method_ids"], ["Banco_Banesco", "Pago_Movil"]) + self.assertEqual(normalized["price_type"], "FIXED") + + # Verify ad creation time parsed from 1749128400000ms + expected_create_time = datetime.fromtimestamp(1749128400000 / 1000, tz=timezone.utc) + self.assertEqual(normalized["ad_created_at"], expected_create_time) + + def test_normalize_sell_ad(self): + with open(self.sell_json_path, "r", encoding="utf-8") as f: + data = json.load(f) + + raw_ad = data["data"][0] + fetched_at = datetime(2026, 6, 5, 13, 30, 0, tzinfo=timezone.utc) + + normalized = normalize_ad(raw_ad, "SELL", fetched_at) + + self.assertEqual(normalized["snapshot_id"], "20260605T133000Z_SELL") + self.assertEqual(normalized["trade_type"], "SELL") + self.assertEqual(normalized["adv_no"], "7a9c3d98765") + self.assertEqual(normalized["price"], 62.30) + self.assertEqual(normalized["payment_methods"], ["MERCANTIL", "PAGO_MOVIL"]) + self.assertEqual(normalized["payment_method_ids"], ["Banco_Mercantil", "Pago_Movil"]) + + def test_defensive_handling(self): + # Test handling missing or corrupted keys + bad_raw_ad = { + "adv": { + "advNo": "bad_ad", + "price": "not_a_float", + "surplusAmount": None + }, + "advertiser": { + "userNo": "bad_advertiser", + "monthOrderCount": "not_an_int" + } + } + + fetched_at = datetime.now(timezone.utc) + normalized = normalize_ad(bad_raw_ad, "BUY", fetched_at) + + # Should not crash, should fall back to defaults + self.assertEqual(normalized["adv_no"], "bad_ad") + self.assertEqual(normalized["price"], 0.0) # fallback + self.assertEqual(normalized["surplus_amount"], 0.0) # fallback + self.assertEqual(normalized["month_order_count"], 0) # fallback + self.assertEqual(normalized["advertiser_no"], "bad_advertiser") + +if __name__ == "__main__": + unittest.main() diff --git a/p2p-collector/tests/test_storage.py b/p2p-collector/tests/test_storage.py new file mode 100644 index 0000000..4db2a4d --- /dev/null +++ b/p2p-collector/tests/test_storage.py @@ -0,0 +1,72 @@ +import unittest +import os +import tempfile +from pathlib import Path +from datetime import datetime, timezone +import pandas as pd + +import sys +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from storage import store_parquet, read_checkpoint, write_checkpoint + +class TestStorage(unittest.TestCase): + def setUp(self): + self.temp_dir = tempfile.TemporaryDirectory() + self.base_dir = self.temp_dir.name + + def tearDown(self): + self.temp_dir.cleanup() + + def test_store_parquet(self): + fetched_at = datetime(2026, 6, 5, 13, 30, 0, tzinfo=timezone.utc) + rows = [ + {"col1": "val1", "col2": 1.5, "col3": True}, + {"col1": "val2", "col2": 2.5, "col3": False} + ] + + # Write + final_path = store_parquet(rows, self.base_dir, fetched_at) + + # Verify path exists + self.assertTrue(os.path.exists(final_path)) + + # Verify partition path structure: year=2026/month=06/day=05/snapshot_20260605_133000.parquet + expected_subdir = os.path.join(self.base_dir, "year=2026", "month=06", "day=05") + self.assertTrue(final_path.startswith(expected_subdir)) + self.assertTrue(final_path.endswith("snapshot_20260605_133000.parquet")) + + # Verify schema file exists + schema_path = os.path.join(expected_subdir, "_schema.parquet") + self.assertTrue(os.path.exists(schema_path)) + + # Read schema back and verify it's empty but has columns + df_schema = pd.read_parquet(schema_path) + self.assertEqual(len(df_schema), 0) + self.assertEqual(list(df_schema.columns), sorted(["col1", "col2", "col3"])) + + # Read data back and verify content + df_data = pd.read_parquet(final_path) + self.assertEqual(len(df_data), 2) + self.assertEqual(df_data.iloc[0]["col1"], "val1") + self.assertEqual(df_data.iloc[1]["col2"], 2.5) + + def test_checkpoint(self): + checkpoint_path = os.path.join(self.base_dir, "checkpoint.json") + + # Test missing checkpoint + self.assertEqual(read_checkpoint(checkpoint_path), {}) + + # Test write and read + data = { + "last_completed_snapshot": "2026-06-05T13:30:00Z", + "last_buy_ad_count": 47, + "last_sell_ad_count": 53 + } + write_checkpoint(checkpoint_path, data) + + read_data = read_checkpoint(checkpoint_path) + self.assertEqual(read_data, data) + +if __name__ == "__main__": + unittest.main() diff --git a/p2p-collector/tests/test_validator.py b/p2p-collector/tests/test_validator.py new file mode 100644 index 0000000..bce5942 --- /dev/null +++ b/p2p-collector/tests/test_validator.py @@ -0,0 +1,97 @@ +import unittest +from datetime import datetime, timezone + +import os +import sys +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from validator import validate_row, validate_snapshot + +class TestValidator(unittest.TestCase): + def setUp(self): + self.config = { + "validation": { + "price_min": 1.0, + "price_max": 500.0, + "reject_zero_finish_rate": True, + "reject_zero_surplus": True + } + } + + self.valid_row = { + "adv_no": "12345", + "price": 58.50, + "surplus_amount": 100.0, + "month_finish_rate": 0.95, + "month_order_count": 100, + "payment_methods": ["BANESCO"], + "ad_created_at": datetime.now(timezone.utc) + } + + def test_validate_row_valid(self): + seen = set() + self.assertTrue(validate_row(self.valid_row, self.config, seen)) + self.assertIn("12345", seen) + + def test_validate_row_duplicate(self): + seen = {"12345"} + self.assertFalse(validate_row(self.valid_row, self.config, seen)) + + def test_validate_row_invalid_price(self): + seen = set() + + # Price <= 0 + row_bad_price = self.valid_row.copy() + row_bad_price["price"] = -1.0 + self.assertFalse(validate_row(row_bad_price, self.config, seen)) + + # Price > max + row_high_price = self.valid_row.copy() + row_high_price["price"] = 1000.0 + self.assertFalse(validate_row(row_high_price, self.config, seen)) + + def test_validate_row_zero_surplus(self): + seen = set() + row_zero_surplus = self.valid_row.copy() + row_zero_surplus["surplus_amount"] = 0.0 + self.assertFalse(validate_row(row_zero_surplus, self.config, seen)) + + def test_validate_row_suspicious_finish(self): + seen = set() + row_suspicious = self.valid_row.copy() + row_suspicious["month_finish_rate"] = 0.0 + row_suspicious["month_order_count"] = 5 + self.assertFalse(validate_row(row_suspicious, self.config, seen)) + + def test_validate_snapshot_empty(self): + with self.assertRaises(ValueError): + validate_snapshot([], datetime.now(timezone.utc)) + + def test_validate_snapshot_calculations(self): + fetched_at = datetime.now(timezone.utc) + ads = [ + # BUY ads + {"trade_type": "BUY", "price": 58.00, "payment_methods": ["BANESCO"], "ad_created_at": fetched_at}, + {"trade_type": "BUY", "price": 59.00, "payment_methods": ["PAGO_MOVIL"], "ad_created_at": fetched_at}, + # SELL ads + {"trade_type": "SELL", "price": 61.00, "payment_methods": ["MERCANTIL"], "ad_created_at": fetched_at}, + {"trade_type": "SELL", "price": 62.00, "payment_methods": ["BANESCO"], "ad_created_at": fetched_at}, + ] + + summary = validate_snapshot(ads, fetched_at) + + self.assertEqual(summary["buy_count"], 2) + self.assertEqual(summary["sell_count"], 2) + self.assertEqual(summary["buy_min"], 58.00) + self.assertEqual(summary["buy_max"], 59.00) + self.assertEqual(summary["sell_min"], 61.00) + self.assertEqual(summary["sell_max"], 62.00) + self.assertEqual(summary["buy_median"], 58.50) + self.assertEqual(summary["sell_median"], 61.50) + + # spread = sell_min - buy_max = 61.00 - 59.00 = 2.00 + self.assertEqual(summary["spread"], 2.00) + self.assertEqual(set(summary["methods"]), {"BANESCO", "PAGO_MOVIL", "MERCANTIL"}) + +if __name__ == "__main__": + unittest.main() diff --git a/p2p-collector/utils.py b/p2p-collector/utils.py new file mode 100644 index 0000000..06b66ad --- /dev/null +++ b/p2p-collector/utils.py @@ -0,0 +1,15 @@ +import random +from datetime import datetime, timezone + +def jitter(interval: float) -> float: + """ + Returns a value within interval ± 10% + """ + variation = interval * 0.10 + return interval + random.uniform(-variation, variation) + +def now_utc() -> datetime: + """ + Returns the current UTC datetime with timezone info. + """ + return datetime.now(timezone.utc) diff --git a/p2p-collector/validator.py b/p2p-collector/validator.py new file mode 100644 index 0000000..b87ac78 --- /dev/null +++ b/p2p-collector/validator.py @@ -0,0 +1,147 @@ +import logging +from datetime import datetime, timezone +import numpy as np + +logger = logging.getLogger(__name__) + +def validate_row(row: dict, config: dict, seen_adv_nos: set) -> bool: + """ + Validates a single normalized ad row. + Returns True if the row is valid, or False if it should be rejected. + """ + val_config = config.get("validation", {}) + price_min = val_config.get("price_min", 1.0) + price_max = val_config.get("price_max", 500.0) + reject_zero_finish = val_config.get("reject_zero_finish_rate", True) + reject_zero_surplus = val_config.get("reject_zero_surplus", True) + + # 1. Empty adv_no + adv_no = row.get("adv_no") + if not adv_no: + logger.error("Rejecting ad: Missing adv_no.") + return False + + # 2. Duplicate adv_no within same snapshot + if adv_no in seen_adv_nos: + logger.warning(f"Rejecting ad {adv_no}: Duplicate within the same snapshot.") + return False + + # 3. Price is None or <= 0 + price = row.get("price") + if price is None or price <= 0: + logger.warning(f"Rejecting ad {adv_no}: Price is None or <= 0 ({price}).") + return False + + # 4. Price outside expected range + if price < price_min or price > price_max: + logger.warning( + f"Rejecting ad {adv_no}: Price {price} is outside configured range [{price_min}, {price_max}]." + ) + return False + + # 5. Surplus amount None or <= 0 + surplus = row.get("surplus_amount") + if reject_zero_surplus and (surplus is None or surplus <= 0): + logger.debug(f"Rejecting ad {adv_no}: Surplus amount is None or <= 0 ({surplus}).") + return False + + # 6. Suspicious advertiser stats: monthFinishRate is 0.0 and monthOrderCount > 0 + finish_rate = row.get("month_finish_rate") + order_count = row.get("month_order_count") + if reject_zero_finish and finish_rate == 0.0 and order_count > 0: + logger.warning( + f"Rejecting ad {adv_no}: Advertiser finished 0.0% of {order_count} orders." + ) + return False + + seen_adv_nos.add(adv_no) + return True + + +def validate_snapshot(flat_ads: list, fetched_at: datetime) -> dict: + """ + Validates a list of all normalized and filtered ads in a single snapshot. + Raises ValueError on critical issues (like completely empty snapshot). + Returns a dictionary of summary statistics for logging/checking. + """ + buy_ads = [ad for ad in flat_ads if ad.get("trade_type") == "BUY"] + sell_ads = [ad for ad in flat_ads if ad.get("trade_type") == "SELL"] + + # 1. Empty snapshot validation + if not buy_ads and not sell_ads: + raise ValueError("CRITICAL: Empty snapshot! Both BUY and SELL ad counts are 0.") + + buy_count = len(buy_ads) + sell_count = len(sell_ads) + + # Warnings for low/high counts + if buy_count < 20 or buy_count > 200: + logger.warning(f"Unusual BUY ad count: {buy_count} (expected 20-200).") + if sell_count < 20 or sell_count > 200: + logger.warning(f"Unusual SELL ad count: {sell_count} (expected 20-200).") + + # Extract prices + buy_prices = [ad["price"] for ad in buy_ads] + sell_prices = [ad["price"] for ad in sell_ads] + + buy_min = min(buy_prices) if buy_prices else 0.0 + buy_max = max(buy_prices) if buy_prices else 0.0 + sell_min = min(sell_prices) if sell_prices else 0.0 + sell_max = max(sell_prices) if sell_prices else 0.0 + + # Calculate medians + buy_median = float(np.median(buy_prices)) if buy_prices else 0.0 + sell_median = float(np.median(sell_prices)) if sell_prices else 0.0 + + # Calculate spread: SELL_min - BUY_max + # Wait, spec says: spread = SELL_min - BUY_max + spread = sell_min - buy_max if (sell_prices and buy_prices) else 0.0 + + # Under normal market conditions, advertisers charge a premium when they sell crypto + # to you (i.e. sell_ads: you BUY from advertiser, so you pay advertiser's SELL price). + # Wait, let's verify what the trade types mean in the spec: + # "tradeType: BUY = advertiser wants to give you VES in exchange for your USDT. They are buying USDT from you." + # So advertiser is BUYING crypto. Since they want to buy, they want to pay as little VES as possible. + # "tradeType: SELL = advertiser wants to give you USDT in exchange for your VES. They are selling USDT to you." + # So advertiser is SELLING crypto. Since they are selling, they want to receive as much VES as possible. + # Therefore, advertiser's SELL price should be higher than advertiser's BUY price. + # So SELL_min should be higher than BUY_max. + # If not (e.g., BUY_max > SELL_min), we have a negative spread or overlap. + if sell_prices and buy_prices: + if buy_max > sell_min: + logger.warning( + f"BUY/SELL price overlap detected! Max BUY price ({buy_max:.2f}) > Min SELL price ({sell_min:.2f})." + ) + + # Check for stale ads (createTime > 7 days old) + stale_count = 0 + for ad in flat_ads: + created_at = ad.get("ad_created_at") + if created_at: + age_days = (fetched_at - created_at).total_seconds() / (24 * 3600) + if age_days > 7.0: + stale_count += 1 + + if stale_count > 0: + logger.warning(f"Stale ads detected: {stale_count} ads were created > 7 days ago.") + + # Get unique payment methods + all_methods = set() + for ad in flat_ads: + all_methods.update(ad.get("payment_methods", [])) + + summary = { + "buy_count": buy_count, + "sell_count": sell_count, + "buy_min": buy_min, + "buy_max": buy_max, + "sell_min": sell_min, + "sell_max": sell_max, + "buy_median": buy_median, + "sell_median": sell_median, + "spread": spread, + "stale_count": stale_count, + "methods": sorted(list(all_methods)), + } + + return summary