Set up continuous P2P VES/USDT market history data collection, normalization, validation, and date-partitioned Parquet storage.
760 lines
24 KiB
Markdown
760 lines
24 KiB
Markdown
# Binance P2P Data Collection — Detailed Implementation Spec
|
||
|
||
> **Purpose:** This document is the single source of truth for the data collection phase. Every field, every endpoint, every edge case is specified so a coder can implement without ambiguity.
|
||
>
|
||
> **Status:** Phase 1 — Data Collection only. No ML. No trading. No algorithm decisions yet.
|
||
|
||
---
|
||
|
||
## 1. The Core Loop (Exact Pseudocode)
|
||
|
||
```
|
||
while True:
|
||
try:
|
||
buy_snap = fetch_all_ads(tradeType="BUY", asset="USDT", fiat="VES")
|
||
sell_snap = fetch_all_ads(tradeType="SELL", asset="USDT", fiat="VES")
|
||
|
||
flat_buy = [normalize_ad(ad, "BUY", now_utc) for ad in buy_snap]
|
||
flat_sell = [normalize_ad(ad, "SELL", now_utc) for ad in sell_snap]
|
||
|
||
validate_snapshot(flat_buy + flat_sell)
|
||
|
||
store_parquet(flat_buy, base_path / "raw" / "buy_ads" / date_partition)
|
||
store_parquet(flat_sell, base_path / "raw" / "sell_ads" / date_partition)
|
||
|
||
log_success(len(flat_buy), len(flat_sell), elapsed)
|
||
|
||
except Exception as e:
|
||
log_error(e, consecutive_failures)
|
||
consecutive_failures += 1
|
||
if consecutive_failures >= 5:
|
||
write_alert_file() # human needs to check
|
||
|
||
sleep(jitter(interval_seconds)) # default 300s ± 10%
|
||
```
|
||
|
||
---
|
||
|
||
## 2. API Client — Exact Implementation
|
||
|
||
### 2.1 Endpoint
|
||
|
||
```
|
||
POST https://p2p.binance.com/bapi/c2c/v2/friendly/c2c/adv/search
|
||
```
|
||
|
||
**No API key.** This is fully public.
|
||
|
||
### 2.2 Headers
|
||
|
||
| Header | Value |
|
||
|---|---|
|
||
| `Content-Type` | `application/json` |
|
||
| `User-Agent` | `Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36` |
|
||
| `Accept` | `*/*` |
|
||
| `Origin` | `https://p2p.binance.com` |
|
||
| `Referer` | `https://p2p.binance.com/` |
|
||
|
||
### 2.3 Request Body (BUY example)
|
||
|
||
```json
|
||
{
|
||
"asset": "USDT",
|
||
"fiat": "VES",
|
||
"tradeType": "BUY",
|
||
"page": 1,
|
||
"rows": 20,
|
||
"payTypes": [],
|
||
"countries": [],
|
||
"publisherType": null,
|
||
"classify": "personal",
|
||
"filter": {}
|
||
}
|
||
```
|
||
|
||
**Key notes for the coder:**
|
||
- `tradeType: "BUY"` = advertiser wants to **give you VES** in exchange for your USDT. They are *buying* USDT from you.
|
||
- `tradeType: "SELL"` = advertiser wants to **give you USDT** in exchange for your VES. They are *selling* USDT to you.
|
||
- `payTypes: []` = no filter, return all payment methods
|
||
- `rows: 20` = Binance's max per page (do not change)
|
||
- `publisherType: null` = both merchants and regular users
|
||
- `classify: "personal"` = personal ads (not business) — covers the P2P marketplace
|
||
|
||
### 2.4 Pagination Logic
|
||
|
||
```python
|
||
def fetch_all_ads(trade_type, asset, fiat, max_pages=10):
|
||
all_ads = []
|
||
|
||
for page in range(1, max_pages + 1):
|
||
body = {
|
||
"asset": asset,
|
||
"fiat": fiat,
|
||
"tradeType": trade_type,
|
||
"page": page,
|
||
"rows": 20,
|
||
"payTypes": [],
|
||
"countries": [],
|
||
"publisherType": None,
|
||
"classify": "personal",
|
||
"filter": {}
|
||
}
|
||
|
||
resp = httpx.post(URL, json=body, headers=HEADERS, timeout=15)
|
||
resp.raise_for_status()
|
||
|
||
data = resp.json()
|
||
|
||
if not data.get("success"):
|
||
raise APIError(f"API returned success=false: {data}")
|
||
|
||
ads = data.get("data", [])
|
||
total = data.get("total", 0)
|
||
|
||
all_ads.extend(ads)
|
||
|
||
# Stop if we've collected all available ads
|
||
if len(all_ads) >= total:
|
||
break
|
||
|
||
# Don't request a page that starts beyond total ads
|
||
if page * 20 >= total:
|
||
break
|
||
|
||
if page < max_pages:
|
||
time.sleep(0.5) # 500ms between pages
|
||
|
||
return all_ads
|
||
```
|
||
|
||
### 2.5 Rate Limiting — Defensive Strategy
|
||
|
||
| Event | Wait time | Notes |
|
||
|---|---|---|
|
||
| Between pages (same snapshot) | 500 ms | Fixed |
|
||
| Between snapshots (BUY → SELL) | 1 second | Fixed |
|
||
| Between full cycles | 300 s ± 30s | Jittered to avoid clock sync |
|
||
| HTTP 429 (rate limited) | 60s → 120s → 240s → 480s | Exponential backoff, cap at 480s |
|
||
| Connection error | 30s retry | Transient network issues |
|
||
| 5xx server error | 60s retry | Binance server-side issues |
|
||
|
||
**Important:** After a 429, reset the backoff after one successful full snapshot.
|
||
|
||
### 2.6 Proxy Support (Optional — keep simple first)
|
||
|
||
Start with **no proxy**, direct from VPS. Only add proxy rotation if we hit rate limits. Binance rarely rate-limits P2P at 1 request/5min.
|
||
|
||
---
|
||
|
||
## 3. Normalization — Exact Field Mapping
|
||
|
||
### 3.1 The Flattened Schema (one row = one ad)
|
||
|
||
| # | Output field | Type | JSON path | Notes |
|
||
|---|---|---|---|---|
|
||
| 1 | `snapshot_id` | string | auto: `{fetch_ts_iso}_{trade_type}` | e.g. `"20260605T133000Z_BUY"` |
|
||
| 2 | `fetched_at` | datetime | auto: now_utc | Always UTC |
|
||
| 3 | `fetched_date` | string | auto: YYYY-MM-DD | Partition column |
|
||
| 4 | `trade_type` | string | `adv.tradeType` | "BUY" or "SELL" |
|
||
| 5 | `adv_no` | string | `adv.advNo` | Unique ad ID |
|
||
| 6 | `asset` | string | `adv.asset` | "USDT" |
|
||
| 7 | `fiat` | string | `adv.fiatUnit` | "VES" |
|
||
| 8 | `price` | float | `adv.price` | Parse as float |
|
||
| 9 | `surplus_amount` | float | `adv.surplusAmount` | Remaining USDT |
|
||
| 10 | `min_amount` | float | `adv.minSingleTransAmount` | Min USDT per trade |
|
||
| 11 | `max_amount` | float | `adv.maxSingleTransAmount` | Max USDT per trade |
|
||
| 12 | `tradable_quantity` | float | `adv.tradableQuantity` | Same as surplus? |
|
||
| 13 | `advertiser_no` | string | `advertiser.userNo` | **Stable ID** — use this |
|
||
| 14 | `advertiser_name` | string | `advertiser.nickName` | For reference only |
|
||
| 15 | `advertiser_type` | string | `advertiser.userType` | "merchant" or "user" |
|
||
| 16 | `month_order_count` | int | `advertiser.monthOrderCount` | |
|
||
| 17 | `month_finish_rate` | float | `advertiser.monthFinishRate` | 0.0 to 1.0 |
|
||
| 18 | `positive_rate` | float | `advertiser.positiveRate` | 0.0 to 1.0 |
|
||
| 19 | `user_positive_rate` | float | `advertiser.userPositiveRate` | older field, same idea |
|
||
| 20 | `payment_methods` | list[str] | `adv.tradeMethods[].payType` | e.g. `["BANESCO", "PAGO_MOVIL"]` |
|
||
| 21 | `payment_method_ids` | list[str] | `adv.tradeMethods[].identifier` | e.g. `["Banco_Banesco", "Pago_Movil"]` |
|
||
| 22 | `ad_created_at` | datetime | `adv.createTime` | Unix millisecond → datetime |
|
||
| 23 | `price_type` | string | `adv.priceType` | Usually "FIXED" |
|
||
|
||
### 3.2 JSON Path Details (nested structure)
|
||
|
||
The API response has this structure:
|
||
|
||
```json
|
||
{
|
||
"data": [
|
||
{
|
||
"adv": {
|
||
"advNo": "6f8b2e...",
|
||
"tradeType": "BUY",
|
||
"asset": "USDT",
|
||
"fiatUnit": "VES",
|
||
"price": "58.50",
|
||
"surplusAmount": "1520.43",
|
||
"maxSingleTransAmount": "5000.00",
|
||
"minSingleTransAmount": "100.00",
|
||
"tradableQuantity": "1520.43",
|
||
"createTime": 1749128400000,
|
||
"fiatSymbol": "Bs",
|
||
"priceType": "FIXED",
|
||
"tradeMethods": [
|
||
{
|
||
"identifier": "Banco_Banesco",
|
||
"payType": "BANESCO",
|
||
"payMethodId": "BANESCO"
|
||
},
|
||
{
|
||
"identifier": "Pago_Movil",
|
||
"payType": "PAGO_MOVIL",
|
||
"payMethodId": "PAGO_MOVIL"
|
||
}
|
||
]
|
||
},
|
||
"advertiser": {
|
||
"userNo": "ABC123",
|
||
"nickName": "CryptoTraderVE",
|
||
"userType": "merchant",
|
||
"monthOrderCount": 342,
|
||
"monthFinishRate": 0.97,
|
||
"positiveRate": 0.99,
|
||
"userPositiveRate": 0.99
|
||
}
|
||
}
|
||
],
|
||
"total": 156,
|
||
"pageSize": 20,
|
||
"success": true
|
||
}
|
||
```
|
||
|
||
### 3.3 Normalization Code Sketch
|
||
|
||
```python
|
||
def normalize_ad(raw_ad: dict, trade_type: str, fetched_at: datetime) -> dict:
|
||
adv = raw_ad["adv"]
|
||
adver = raw_ad["advertiser"]
|
||
|
||
payment_methods = [m["payType"] for m in adv.get("tradeMethods", [])]
|
||
payment_method_ids = [m["identifier"] for m in adv.get("tradeMethods", [])]
|
||
|
||
return {
|
||
"snapshot_id": f"{fetched_at.strftime('%Y%m%dT%H%M%SZ')}_{trade_type}",
|
||
"fetched_at": fetched_at,
|
||
"fetched_date": fetched_at.strftime("%Y-%m-%d"),
|
||
"trade_type": trade_type,
|
||
"adv_no": adv["advNo"],
|
||
"asset": adv["asset"],
|
||
"fiat": adv["fiatUnit"],
|
||
"price": float(adv["price"]),
|
||
"surplus_amount": float(adv.get("surplusAmount", 0)),
|
||
"min_amount": float(adv.get("minSingleTransAmount", 0)),
|
||
"max_amount": float(adv.get("maxSingleTransAmount", 0)),
|
||
"tradable_quantity": float(adv.get("tradableQuantity", 0)),
|
||
"advertiser_no": adver["userNo"],
|
||
"advertiser_name": adver["nickName"],
|
||
"advertiser_type": adver.get("userType", "user"),
|
||
"month_order_count": adver.get("monthOrderCount", 0),
|
||
"month_finish_rate": float(adver.get("monthFinishRate", 0)),
|
||
"positive_rate": float(adver.get("positiveRate", 0)),
|
||
"user_positive_rate": float(adver.get("userPositiveRate", 0)),
|
||
"payment_methods": payment_methods, # e.g. ["BANESCO", "PAGO_MOVIL"]
|
||
"payment_method_ids": payment_method_ids, # e.g. ["Banco_Banesco", "Pago_Movil"]
|
||
"ad_created_at": datetime.fromtimestamp(
|
||
adv["createTime"] / 1000, tz=timezone.utc
|
||
),
|
||
"price_type": adv.get("priceType", "FIXED"),
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 4. Payment Methods — The Critical Column
|
||
|
||
### 4.1 Known Payment Method Identifiers for Venezuela
|
||
|
||
| `payType` value | `identifier` value | Common name |
|
||
|---|---|---|
|
||
| `BANESCO` | `Banco_Banesco` | Banesco bank transfer |
|
||
| `MERCANTIL` | `Banco_Mercantil` | Mercantil bank transfer |
|
||
| `PROVINCIAL` | `Banco_Provincial` | Banco Provincial (BBVA) |
|
||
| `VENEZUELA` | `Banco_De_Venezuela` | Banco de Venezuela (BDV) |
|
||
| `BANCO_NACIONAL_CREDITO` | `Banco_Nacional_De_Credito` | BNC |
|
||
| `SOFITASA` | `Sofitasa` | Sofitasa |
|
||
| `BANCAMIGA` | `Bancamiga` | Bancamiga |
|
||
| `BANCO_EXTERIOR` | `Banco_Exterior` | Banco Exterior |
|
||
| `BANCO_OCCIDENTE` | `Banco_Occidente` | Banco Occidental de Descuento (BOD) |
|
||
| `BANCO_PLATA` | `Banco_Plata` | Banco Plaza |
|
||
| `BANESCO_PERSONAL` | `Banesco_Personal` | Banesco personal account |
|
||
| `PAGO_MOVIL` | `Pago_Movil` | Mobile payment (inter-bank) |
|
||
| `BANCANET` | `Bancanet` | Bancanet |
|
||
| `BANPLUS` | `Banplus` | Banplus |
|
||
| `ZELLE` | `Zelle` | Zelle (USD, not VES) |
|
||
| `PAYPAL` | `Paypal` | PayPal (USD) |
|
||
| `CASH_VEF` | `Efectivo_VEF` | Cash in VES |
|
||
| `CASH_USD` | `Efectivo_USD` | Cash in USD |
|
||
| `PAGO_MOVIL` | `Pago_Movil_Banco_Venezuela` | Mobile payment at specific bank |
|
||
|
||
### 4.2 Why This Matters for Bank Arbitrage
|
||
|
||
```python
|
||
# Example analysis query after ~1 week of data:
|
||
# For each snapshot, find the best path:
|
||
#
|
||
# Best BUY price (sell USDT → get VES): Banesco, 60.50 VES/USDT
|
||
# Best SELL price (buy USDT → give VES): Mercantil, 62.30 VES/USDT
|
||
# Gross arbitrage: 62.30 - 60.50 = 1.80 VES/USDT = ~2.9% spread
|
||
#
|
||
# If same bank: you lose 0% on internal transfer
|
||
# If different banks: you lose bank transfer fee (maybe 0.5%)
|
||
# Net profit = 2.9% - 0.5% = 2.4% per round trip
|
||
```
|
||
|
||
### 4.3 Storage Consideration
|
||
|
||
`payment_methods` is a **list of strings** — this is fine in Parquet (stored as a repeated field). For CSV it would need to be JSON-encoded or one-hot encoded later.
|
||
|
||
---
|
||
|
||
## 5. Storage — Exact File Layout
|
||
|
||
```
|
||
/path/to/data/
|
||
├── raw/
|
||
│ ├── buy_ads/
|
||
│ │ └── year=2026/
|
||
│ │ └── month=06/
|
||
│ │ └── day=05/
|
||
│ │ ├── snapshot_20260605_133000.parquet
|
||
│ │ ├── snapshot_20260605_133500.parquet
|
||
│ │ └── ...
|
||
│ ├── sell_ads/
|
||
│ │ └── year=2026/
|
||
│ │ └── month=06/
|
||
│ │ └── day=05/
|
||
│ │ ├── snapshot_20260605_133000.parquet
|
||
│ │ └── ...
|
||
│ └── daily_merged/ <-- OPTIONAL: daily combined view
|
||
│ └── year=2026/
|
||
│ └── month=06/
|
||
│ └── 2026-06-05.parquet
|
||
│
|
||
├── logs/
|
||
│ └── collector_20260605.log
|
||
│
|
||
├── alerts/ <-- alert marker files go here
|
||
│ └── (empty if no issues)
|
||
│
|
||
└── checkpoint.json <-- for restart resilience
|
||
```
|
||
|
||
### 5.1 File Naming Convention
|
||
|
||
**Snapshot files:** `snapshot_{YYYYMMDD}_{HHMMSS}.parquet`
|
||
- Time used: the start timestamp of the snapshot (UTC)
|
||
- Example: `snapshot_20260605_133000.parquet`
|
||
|
||
**Why no UUIDs?** The timestamp + trade_type partition is already unique. No repeated names unless you run two collectors (don't).
|
||
|
||
### 5.2 Atomic Writes (No Partial Files)
|
||
|
||
```python
|
||
def store_parquet(rows, base_dir, fetched_at):
|
||
if not rows:
|
||
return
|
||
|
||
# Build partition path from timestamp
|
||
year = fetched_at.strftime("%Y")
|
||
month = fetched_at.strftime("%m")
|
||
day = fetched_at.strftime("%d")
|
||
filename = f"snapshot_{fetched_at.strftime('%Y%m%d_%H%M%S')}.parquet"
|
||
|
||
dest_dir = Path(base_dir) / f"year={year}" / f"month={month}" / f"day={day}"
|
||
dest_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# Write to temp file first
|
||
tmp_path = dest_dir / (filename + ".tmp")
|
||
final_path = dest_dir / filename
|
||
|
||
df = pd.DataFrame(rows)
|
||
df.to_parquet(tmp_path, index=False, engine="pyarrow")
|
||
|
||
# Atomic rename
|
||
tmp_path.rename(final_path)
|
||
```
|
||
|
||
### 5.3 Schema Consistency Check
|
||
|
||
Each snapshot should write a schema marker file once:
|
||
|
||
```python
|
||
# After first successful write per partition, write schema.parquet as a reference
|
||
schema_path = dest_dir / "_schema.parquet"
|
||
if not schema_path.exists():
|
||
df.iloc[:0].to_parquet(schema_path) # empty DataFrame with same schema
|
||
```
|
||
|
||
This allows downstream readers to discover the schema without reading a full snapshot.
|
||
|
||
---
|
||
|
||
## 6. Data Validation During Collection
|
||
|
||
### 6.1 Row-Level Rejection Rules
|
||
|
||
Reject (skip, don't crash) individual ads if:
|
||
|
||
| Condition | Why | Action |
|
||
|---|---|---|
|
||
| `price` is None or ≤ 0 | Bad data | Log warning, skip |
|
||
| `surplusAmount` is None or ≤ 0 | Ad has no USDT left | Log debug, skip |
|
||
| `monthFinishRate` is 0.0 and `monthOrderCount` > 0 | Merchant hasn't completed any orders (suspicious) | Log warning, skip |
|
||
| `price` < 1.0 or `price` > 500.0 | Way outside VES/USDT normal range (should be ~50–150) | Log warning, skip this ad |
|
||
| Empty `advNo` | Missing identifier | Log error, skip |
|
||
| Duplicate `advNo` within same snapshot | Possible API glitch | Log warning, keep first occurrence |
|
||
|
||
### 6.2 Snapshot-Level Validation
|
||
|
||
After collecting all ads for one snapshot:
|
||
|
||
```
|
||
✅ TOTAL ADS: BUY=47 SELL=53 (should be 20-200 each)
|
||
✅ PRICE RANGE: BUY [54.20 - 62.80] SELL [58.00 - 68.50]
|
||
(SELL should be consistently higher than BUY)
|
||
If not: LOG WARNING "BUY/SELL overlap detected"
|
||
✅ SPREAD: SELL_min - BUY_max = 58.00 - 62.80 = -4.80
|
||
(If negative: spread is inverted — unusual but possible)
|
||
Log: "Current spread: {spread:.2f} VES/USDT"
|
||
✅ MEDIAN PRICE: BUY=58.30 SELL=63.50
|
||
✅ AD STALENESS: 0 ads with createTime > 7 days old
|
||
(If any: they're stale, still keep them, but log it)
|
||
✅ EMPTY SNAPSHOT: If BUY=0 AND SELL=0 → CRITICAL ALERT
|
||
```
|
||
|
||
### 6.3 Snapshot Summary Log Line (one line per snapshot)
|
||
|
||
```
|
||
2026-06-05 13:30:00 UTC | BUY=47 ads [54.20–62.80] SELL=53 ads [58.00–68.50] | spread= -4.80 | took 3.2s | methods=[BANESCO,PAGO_MOVIL,MERCANTIL,...]
|
||
```
|
||
|
||
---
|
||
|
||
## 7. Scheduling & Lifecycle
|
||
|
||
### 7.1 Startup Behavior
|
||
|
||
```
|
||
1. Read checkpoint.json (if exists)
|
||
→ "last_completed_snapshot": "2026-06-05T13:25:00Z"
|
||
→ Wait until (last_completed + interval) before starting
|
||
→ If checkpoint is missing or corrupted, start immediately
|
||
|
||
2. Verify data directory is writable
|
||
→ Try writing a test file, then delete it
|
||
|
||
3. Log: "Starting collector. Interval=300s. Pairs=USDT/VES"
|
||
```
|
||
|
||
### 7.2 Graceful Shutdown
|
||
|
||
```python
|
||
import signal
|
||
|
||
running = True
|
||
|
||
def handle_signal(sig, frame):
|
||
global running
|
||
logging.info("Received signal %s, finishing current snapshot...", sig)
|
||
running = False
|
||
|
||
signal.signal(signal.SIGINT, handle_signal)
|
||
signal.signal(signal.SIGTERM, handle_signal)
|
||
|
||
# In main loop:
|
||
while running:
|
||
# ... do snapshot ...
|
||
# Write checkpoint after each successful snapshot
|
||
write_checkpoint({"last_completed_snapshot": now_utc.isoformat()})
|
||
```
|
||
|
||
### 7.3 Checkpoint File Format
|
||
|
||
```json
|
||
{
|
||
"last_completed_snapshot": "2026-06-05T13:30:00Z",
|
||
"last_buy_ad_count": 47,
|
||
"last_sell_ad_count": 53,
|
||
"consecutive_failures": 0,
|
||
"total_snapshots": 284,
|
||
"first_snapshot": "2026-06-01T00:00:00Z",
|
||
"version": "1.0"
|
||
}
|
||
```
|
||
|
||
### 7.4 Alert Marker File
|
||
|
||
After 5 consecutive failures, write:
|
||
|
||
```
|
||
/path/to/data/alerts/20260605_133000_5_failures.alert
|
||
```
|
||
|
||
Content:
|
||
```json
|
||
{
|
||
"timestamp": "2026-06-05T13:30:00Z",
|
||
"error": "HTTP 500 after 3 retries",
|
||
"consecutive_failures": 5,
|
||
"traceback": "..."
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 8. First-Run Verification Protocol
|
||
|
||
After the collector writes its **first snapshot**, the coder should manually verify:
|
||
|
||
### Step 1: Read the Parquet file back
|
||
|
||
```python
|
||
import pandas as pd
|
||
df = pd.read_parquet("data/raw/buy_ads/year=2026/month=06/day=05/snapshot_20260605_133000.parquet")
|
||
df.info()
|
||
df.head()
|
||
```
|
||
|
||
Check:
|
||
- [ ] All columns present (23 columns from spec)
|
||
- [ ] No null values in critical fields (price, adv_no, advertiser_no)
|
||
- [ ] `price` is float type, not string
|
||
- [ ] `fetched_at` is datetime type
|
||
- [ ] `payment_methods` is a proper list column
|
||
|
||
### Step 2: Verify BUY vs SELL logic
|
||
|
||
```python
|
||
buy_ads = df[df["trade_type"] == "BUY"]
|
||
sell_ads = df[df["trade_type"] == "SELL"]
|
||
|
||
print(f"BUY ads count: {len(buy_ads)}")
|
||
print(f"SELL ads count: {len(sell_ads)}")
|
||
print(f"BUY price range: {buy_ads['price'].min():.2f} - {buy_ads['price'].max():.2f}")
|
||
print(f"SELL price range: {sell_ads['price'].min():.2f} - {sell_ads['price'].max():.2f}")
|
||
```
|
||
|
||
Expected: SELL prices are higher than BUY prices (advertiser selling USDT charges a premium vs. buying USDT).
|
||
|
||
### Step 3: Verify payment methods are captured
|
||
|
||
```python
|
||
all_methods = set()
|
||
for methods in df["payment_methods"]:
|
||
all_methods.update(methods)
|
||
print(f"Payment methods found: {sorted(all_methods)}")
|
||
```
|
||
|
||
Expected: At least BANESCO and PAGO_MOVIL will appear. Possibly 5–15 different banks.
|
||
|
||
### Step 4: Verify advertiser diversity
|
||
|
||
```python
|
||
print(f"Unique advertisers: {df['advertiser_no'].nunique()}")
|
||
print(f"Merchants: {(df['advertiser_type'] == 'merchant').sum()}")
|
||
print(f"Users: {(df['advertiser_type'] == 'user').sum()}")
|
||
```
|
||
|
||
### Step 5: Run the collector for 1 hour (~12 snapshots) and verify:
|
||
|
||
```bash
|
||
ls data/raw/buy_ads/year=2026/month=06/day=05/ | wc -l
|
||
# Should be ~12
|
||
```
|
||
|
||
- [ ] No duplicate timestamps
|
||
- [ ] No gaps > 6 minutes
|
||
- [ ] No crash/restart in the logs
|
||
|
||
---
|
||
|
||
## 9. File & Module Structure (Exact)
|
||
|
||
```
|
||
p2p-collector/
|
||
├── collect_p2p.py # Entry point: argument parsing, main loop
|
||
├── config.yaml # All configurable settings
|
||
├── binance_client.py # fetch_all_ads(), pagination, rate limiting
|
||
├── normalizer.py # normalize_ad(), flatten schema
|
||
├── storage.py # store_parquet(), atomic writes, checkpoint
|
||
├── validator.py # validate_row(), validate_snapshot()
|
||
├── scheduler.py # main loop, sleep/jitter, signal handling
|
||
├── alert.py # write_alert_file(), logging setup
|
||
├── utils.py # jitter(), datetime helpers
|
||
├── requirements.txt # pinned versions
|
||
├── Makefile # setup, run, clean, test commands
|
||
├── tests/
|
||
│ ├── test_normalizer.py # Test with sample API response
|
||
│ ├── test_storage.py # Test atomic writes
|
||
│ └── test_validator.py # Test rejection rules
|
||
├── sample_responses/
|
||
│ ├── response_buy.json # One real-ish API response for tests
|
||
│ └── response_sell.json
|
||
└── README.md # Run instructions
|
||
```
|
||
|
||
### requirements.txt
|
||
|
||
```
|
||
httpx>=0.27,<1.0
|
||
pandas>=2.0,<3.0
|
||
pyarrow>=14.0,<16.0
|
||
pyyaml>=6.0,<7.0
|
||
```
|
||
|
||
Note: `httpx` over `requests` because it has native timeout support, cleaner API. Fall back to `requests` if the coder prefers.
|
||
|
||
---
|
||
|
||
## 10. `config.yaml` — Complete Reference
|
||
|
||
```yaml
|
||
binance:
|
||
base_url: "https://p2p.binance.com/bapi/c2c/v2/friendly/c2c/adv/search"
|
||
user_agent: "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
|
||
timeout_seconds: 15
|
||
max_pages: 10
|
||
request_delay_seconds: 0.5
|
||
|
||
collection:
|
||
pairs:
|
||
- asset: "USDT"
|
||
fiat: "VES"
|
||
interval_seconds: 300
|
||
output_dir: "./data/raw"
|
||
retry_attempts: 3
|
||
retry_delay_base_seconds: 10
|
||
|
||
validation:
|
||
price_min: 1.0
|
||
price_max: 500.0
|
||
reject_zero_finish_rate: true
|
||
reject_zero_surplus: true
|
||
|
||
logging:
|
||
level: "INFO"
|
||
file: "./data/logs/collector.log"
|
||
max_bytes: 10485760 # 10 MB
|
||
backup_count: 5
|
||
format: "%(asctime)s | %(levelname)s | %(message)s"
|
||
|
||
alerts:
|
||
consecutive_failure_threshold: 5
|
||
alert_dir: "./data/alerts"
|
||
```
|
||
|
||
---
|
||
|
||
## 11. Run Modes
|
||
|
||
### Mode 1: One-shot test
|
||
|
||
```bash
|
||
python collect_p2p.py --once
|
||
```
|
||
|
||
- Fetches one BUY snapshot + one SELL snapshot
|
||
- Writes to disk
|
||
- Prints summary
|
||
- Exits
|
||
- Used for: first run, testing, debugging
|
||
|
||
### Mode 2: Daemon (continuous)
|
||
|
||
```bash
|
||
python collect_p2p.py
|
||
```
|
||
|
||
- Runs forever
|
||
- Loop with interval
|
||
- Graceful shutdown on Ctrl+C
|
||
|
||
### Mode 3: Backfill (future)
|
||
|
||
```bash
|
||
python collect_p2p.py --backfill --start=2026-06-01 --end=2026-06-03
|
||
```
|
||
|
||
- Not needed now
|
||
- Architecture supports it later
|
||
|
||
### Mode 4: Validate-only
|
||
|
||
```bash
|
||
python collect_p2p.py --validate data/raw/buy_ads/year=2026/month=06/day=05/
|
||
```
|
||
|
||
- Reads Parquet files
|
||
- Runs validation checks
|
||
- Prints report
|
||
- No API calls
|
||
|
||
---
|
||
|
||
## 12. Testing the Coder's Work
|
||
|
||
Hand this checklist to the coder when they say "it's done":
|
||
|
||
| # | Test | How |
|
||
|---|---|---|
|
||
| 1 | **API connectivity** | `python collect_p2p.py --once` returns ads without error |
|
||
| 2 | **Pagination works** | Inspect: total ads fetched vs `total` field from API |
|
||
| 3 | **Both BUY and SELL** | Both directories have at least one file after `--once` |
|
||
| 4 | **Schema correct** | `pd.read_parquet(file)` → 23 columns, correct dtypes |
|
||
| 5 | **Payment methods populated** | At least 3 payment methods in the first snapshot |
|
||
| 6 | **Atomic write** | Kill the process mid-write (SIGKILL), no partial files remain. Only `.tmp` files |
|
||
| 7 | **Graceful shutdown** | Ctrl+C during a snapshot → clean exit, last snapshot saved |
|
||
| 8 | **Restart resilience** | Start collector, kill it, restart → resumes without duplicate timestamps |
|
||
| 9 | **Rate limiting** | No HTTP 429 in logs after 1 hour of continuous running |
|
||
| 10 | **Storage efficiency** | 1 hour of data ≤ 3 MB total on disk |
|
||
|
||
---
|
||
|
||
## 13. Post-Collection — What the Data Will Look Like After One Week
|
||
|
||
| Metric | Expected value |
|
||
|---|---|
|
||
| Snapshots collected | ~2,016 (7 days × 288 snapshots/day) |
|
||
| Total raw ads | ~200,000–400,000 rows |
|
||
| Storage used | ~20–100 MB |
|
||
| Unique advertisers | 100–500 |
|
||
| Unique payment methods | 10–20 |
|
||
| Price range (BUY) | ~55–65 VES/USDT (fluctuates with parallel dollar) |
|
||
| Price range (SELL) | ~58–70 VES/USDT |
|
||
| Typical spread | ~2–6 VES/USDT (3–10%) |
|
||
|
||
**After 1 week of collection, we stop and do EDA before any ML decisions.**
|
||
|
||
---
|
||
|
||
## 14. Known Gotchas / FAQ for the Coder
|
||
|
||
**Q: What if the API returns different fields than documented?**
|
||
A: The normalizer should use `.get()` with defaults for every field. Log a warning if a field is missing that we expected. Don't crash.
|
||
|
||
**Q: What if `tradeMethods` is empty?**
|
||
A: Some ads have no payment methods listed. Store as empty list `[]`. Continue. This is valid data.
|
||
|
||
**Q: What timezone should I use?**
|
||
A: **Everything in UTC.** The user is in VET (UTC-4), but all stored timestamps are UTC. Timezone conversion is only for display.
|
||
|
||
**Q: What if the VPS reboots?**
|
||
A: systemd `Restart=always` handles this. The collector reads the last checkpoint and continues after the appropriate delay.
|
||
|
||
**Q: Should I use asyncio?**
|
||
A: No. Simple synchronous code. The delay between requests (5 minutes) means async provides zero benefit and adds complexity.
|
||
|
||
**Q: Can I use SQLite instead of Parquet?**
|
||
A: You could, but Parquet is more storage-efficient and directly loadable into ML frameworks (Pandas, Polars, PyTorch). Stick with Parquet.
|
||
|
||
---
|
||
|
||
*End of data collection spec. Hand this to the coding agent as the single source of truth.*
|