binance-p2p-market-history/base_plan.md
Gabriel Ramos 2c41a7a6b3 feat: implement binance p2p collector daemon
Set up continuous P2P VES/USDT market history data collection, normalization, validation, and date-partitioned Parquet storage.
2026-06-05 14:40:05 -04:00

24 KiB
Raw Permalink Blame History

Binance P2P Data Collection — Detailed Implementation Spec

Purpose: This document is the single source of truth for the data collection phase. Every field, every endpoint, every edge case is specified so a coder can implement without ambiguity.

Status: Phase 1 — Data Collection only. No ML. No trading. No algorithm decisions yet.


1. The Core Loop (Exact Pseudocode)

while True:
    try:
        buy_snap  = fetch_all_ads(tradeType="BUY",  asset="USDT", fiat="VES")
        sell_snap = fetch_all_ads(tradeType="SELL", asset="USDT", fiat="VES")
        
        flat_buy  = [normalize_ad(ad, "BUY", now_utc)  for ad in buy_snap]
        flat_sell = [normalize_ad(ad, "SELL", now_utc) for ad in sell_snap]
        
        validate_snapshot(flat_buy + flat_sell)
        
        store_parquet(flat_buy,  base_path / "raw" / "buy_ads" / date_partition)
        store_parquet(flat_sell, base_path / "raw" / "sell_ads" / date_partition)
        
        log_success(len(flat_buy), len(flat_sell), elapsed)
        
    except Exception as e:
        log_error(e, consecutive_failures)
        consecutive_failures += 1
        if consecutive_failures >= 5:
            write_alert_file()           # human needs to check
    
    sleep(jitter(interval_seconds))       # default 300s ± 10%

2. API Client — Exact Implementation

2.1 Endpoint

POST https://p2p.binance.com/bapi/c2c/v2/friendly/c2c/adv/search

No API key. This is fully public.

2.2 Headers

Header Value
Content-Type application/json
User-Agent Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36
Accept */*
Origin https://p2p.binance.com
Referer https://p2p.binance.com/

2.3 Request Body (BUY example)

{
    "asset": "USDT",
    "fiat": "VES",
    "tradeType": "BUY",
    "page": 1,
    "rows": 20,
    "payTypes": [],
    "countries": [],
    "publisherType": null,
    "classify": "personal",
    "filter": {}
}

Key notes for the coder:

  • tradeType: "BUY" = advertiser wants to give you VES in exchange for your USDT. They are buying USDT from you.
  • tradeType: "SELL" = advertiser wants to give you USDT in exchange for your VES. They are selling USDT to you.
  • payTypes: [] = no filter, return all payment methods
  • rows: 20 = Binance's max per page (do not change)
  • publisherType: null = both merchants and regular users
  • classify: "personal" = personal ads (not business) — covers the P2P marketplace

2.4 Pagination Logic

def fetch_all_ads(trade_type, asset, fiat, max_pages=10):
    all_ads = []
    
    for page in range(1, max_pages + 1):
        body = {
            "asset": asset,
            "fiat": fiat,
            "tradeType": trade_type,
            "page": page,
            "rows": 20,
            "payTypes": [],
            "countries": [],
            "publisherType": None,
            "classify": "personal",
            "filter": {}
        }
        
        resp = httpx.post(URL, json=body, headers=HEADERS, timeout=15)
        resp.raise_for_status()
        
        data = resp.json()
        
        if not data.get("success"):
            raise APIError(f"API returned success=false: {data}")
        
        ads = data.get("data", [])
        total = data.get("total", 0)
        
        all_ads.extend(ads)
        
        # Stop if we've collected all available ads
        if len(all_ads) >= total:
            break
        
        # Don't request a page that starts beyond total ads
        if page * 20 >= total:
            break
        
        if page < max_pages:
            time.sleep(0.5)     # 500ms between pages
    
    return all_ads

2.5 Rate Limiting — Defensive Strategy

Event Wait time Notes
Between pages (same snapshot) 500 ms Fixed
Between snapshots (BUY → SELL) 1 second Fixed
Between full cycles 300 s ± 30s Jittered to avoid clock sync
HTTP 429 (rate limited) 60s → 120s → 240s → 480s Exponential backoff, cap at 480s
Connection error 30s retry Transient network issues
5xx server error 60s retry Binance server-side issues

Important: After a 429, reset the backoff after one successful full snapshot.

2.6 Proxy Support (Optional — keep simple first)

Start with no proxy, direct from VPS. Only add proxy rotation if we hit rate limits. Binance rarely rate-limits P2P at 1 request/5min.


3. Normalization — Exact Field Mapping

3.1 The Flattened Schema (one row = one ad)

# Output field Type JSON path Notes
1 snapshot_id string auto: {fetch_ts_iso}_{trade_type} e.g. "20260605T133000Z_BUY"
2 fetched_at datetime auto: now_utc Always UTC
3 fetched_date string auto: YYYY-MM-DD Partition column
4 trade_type string adv.tradeType "BUY" or "SELL"
5 adv_no string adv.advNo Unique ad ID
6 asset string adv.asset "USDT"
7 fiat string adv.fiatUnit "VES"
8 price float adv.price Parse as float
9 surplus_amount float adv.surplusAmount Remaining USDT
10 min_amount float adv.minSingleTransAmount Min USDT per trade
11 max_amount float adv.maxSingleTransAmount Max USDT per trade
12 tradable_quantity float adv.tradableQuantity Same as surplus?
13 advertiser_no string advertiser.userNo Stable ID — use this
14 advertiser_name string advertiser.nickName For reference only
15 advertiser_type string advertiser.userType "merchant" or "user"
16 month_order_count int advertiser.monthOrderCount
17 month_finish_rate float advertiser.monthFinishRate 0.0 to 1.0
18 positive_rate float advertiser.positiveRate 0.0 to 1.0
19 user_positive_rate float advertiser.userPositiveRate older field, same idea
20 payment_methods list[str] adv.tradeMethods[].payType e.g. ["BANESCO", "PAGO_MOVIL"]
21 payment_method_ids list[str] adv.tradeMethods[].identifier e.g. ["Banco_Banesco", "Pago_Movil"]
22 ad_created_at datetime adv.createTime Unix millisecond → datetime
23 price_type string adv.priceType Usually "FIXED"

3.2 JSON Path Details (nested structure)

The API response has this structure:

{
    "data": [
        {
            "adv": {
                "advNo": "6f8b2e...",
                "tradeType": "BUY",
                "asset": "USDT",
                "fiatUnit": "VES",
                "price": "58.50",
                "surplusAmount": "1520.43",
                "maxSingleTransAmount": "5000.00",
                "minSingleTransAmount": "100.00",
                "tradableQuantity": "1520.43",
                "createTime": 1749128400000,
                "fiatSymbol": "Bs",
                "priceType": "FIXED",
                "tradeMethods": [
                    {
                        "identifier": "Banco_Banesco",
                        "payType": "BANESCO",
                        "payMethodId": "BANESCO"
                    },
                    {
                        "identifier": "Pago_Movil",
                        "payType": "PAGO_MOVIL",
                        "payMethodId": "PAGO_MOVIL"
                    }
                ]
            },
            "advertiser": {
                "userNo": "ABC123",
                "nickName": "CryptoTraderVE",
                "userType": "merchant",
                "monthOrderCount": 342,
                "monthFinishRate": 0.97,
                "positiveRate": 0.99,
                "userPositiveRate": 0.99
            }
        }
    ],
    "total": 156,
    "pageSize": 20,
    "success": true
}

3.3 Normalization Code Sketch

def normalize_ad(raw_ad: dict, trade_type: str, fetched_at: datetime) -> dict:
    adv = raw_ad["adv"]
    adver = raw_ad["advertiser"]
    
    payment_methods = [m["payType"] for m in adv.get("tradeMethods", [])]
    payment_method_ids = [m["identifier"] for m in adv.get("tradeMethods", [])]
    
    return {
        "snapshot_id": f"{fetched_at.strftime('%Y%m%dT%H%M%SZ')}_{trade_type}",
        "fetched_at": fetched_at,
        "fetched_date": fetched_at.strftime("%Y-%m-%d"),
        "trade_type": trade_type,
        "adv_no": adv["advNo"],
        "asset": adv["asset"],
        "fiat": adv["fiatUnit"],
        "price": float(adv["price"]),
        "surplus_amount": float(adv.get("surplusAmount", 0)),
        "min_amount": float(adv.get("minSingleTransAmount", 0)),
        "max_amount": float(adv.get("maxSingleTransAmount", 0)),
        "tradable_quantity": float(adv.get("tradableQuantity", 0)),
        "advertiser_no": adver["userNo"],
        "advertiser_name": adver["nickName"],
        "advertiser_type": adver.get("userType", "user"),
        "month_order_count": adver.get("monthOrderCount", 0),
        "month_finish_rate": float(adver.get("monthFinishRate", 0)),
        "positive_rate": float(adver.get("positiveRate", 0)),
        "user_positive_rate": float(adver.get("userPositiveRate", 0)),
        "payment_methods": payment_methods,       # e.g. ["BANESCO", "PAGO_MOVIL"]
        "payment_method_ids": payment_method_ids, # e.g. ["Banco_Banesco", "Pago_Movil"]
        "ad_created_at": datetime.fromtimestamp(
            adv["createTime"] / 1000, tz=timezone.utc
        ),
        "price_type": adv.get("priceType", "FIXED"),
    }

4. Payment Methods — The Critical Column

4.1 Known Payment Method Identifiers for Venezuela

payType value identifier value Common name
BANESCO Banco_Banesco Banesco bank transfer
MERCANTIL Banco_Mercantil Mercantil bank transfer
PROVINCIAL Banco_Provincial Banco Provincial (BBVA)
VENEZUELA Banco_De_Venezuela Banco de Venezuela (BDV)
BANCO_NACIONAL_CREDITO Banco_Nacional_De_Credito BNC
SOFITASA Sofitasa Sofitasa
BANCAMIGA Bancamiga Bancamiga
BANCO_EXTERIOR Banco_Exterior Banco Exterior
BANCO_OCCIDENTE Banco_Occidente Banco Occidental de Descuento (BOD)
BANCO_PLATA Banco_Plata Banco Plaza
BANESCO_PERSONAL Banesco_Personal Banesco personal account
PAGO_MOVIL Pago_Movil Mobile payment (inter-bank)
BANCANET Bancanet Bancanet
BANPLUS Banplus Banplus
ZELLE Zelle Zelle (USD, not VES)
PAYPAL Paypal PayPal (USD)
CASH_VEF Efectivo_VEF Cash in VES
CASH_USD Efectivo_USD Cash in USD
PAGO_MOVIL Pago_Movil_Banco_Venezuela Mobile payment at specific bank

4.2 Why This Matters for Bank Arbitrage

# Example analysis query after ~1 week of data:
# For each snapshot, find the best path:
#
# Best BUY price (sell USDT → get VES):          Banesco, 60.50 VES/USDT
# Best SELL price (buy USDT → give VES):         Mercantil, 62.30 VES/USDT
# Gross arbitrage: 62.30 - 60.50 = 1.80 VES/USDT = ~2.9% spread
#
# If same bank: you lose 0% on internal transfer
# If different banks: you lose bank transfer fee (maybe 0.5%)
# Net profit = 2.9% - 0.5% = 2.4% per round trip

4.3 Storage Consideration

payment_methods is a list of strings — this is fine in Parquet (stored as a repeated field). For CSV it would need to be JSON-encoded or one-hot encoded later.


5. Storage — Exact File Layout

/path/to/data/
├── raw/
│   ├── buy_ads/
│   │   └── year=2026/
│   │       └── month=06/
│   │           └── day=05/
│   │               ├── snapshot_20260605_133000.parquet
│   │               ├── snapshot_20260605_133500.parquet
│   │               └── ...
│   ├── sell_ads/
│   │   └── year=2026/
│   │       └── month=06/
│   │           └── day=05/
│   │               ├── snapshot_20260605_133000.parquet
│   │               └── ...
│   └── daily_merged/            <-- OPTIONAL: daily combined view
│       └── year=2026/
│           └── month=06/
│               └── 2026-06-05.parquet
│
├── logs/
│   └── collector_20260605.log
│
├── alerts/                       <-- alert marker files go here
│   └── (empty if no issues)
│
└── checkpoint.json               <-- for restart resilience

5.1 File Naming Convention

Snapshot files: snapshot_{YYYYMMDD}_{HHMMSS}.parquet

  • Time used: the start timestamp of the snapshot (UTC)
  • Example: snapshot_20260605_133000.parquet

Why no UUIDs? The timestamp + trade_type partition is already unique. No repeated names unless you run two collectors (don't).

5.2 Atomic Writes (No Partial Files)

def store_parquet(rows, base_dir, fetched_at):
    if not rows:
        return
    
    # Build partition path from timestamp
    year = fetched_at.strftime("%Y")
    month = fetched_at.strftime("%m")
    day = fetched_at.strftime("%d")
    filename = f"snapshot_{fetched_at.strftime('%Y%m%d_%H%M%S')}.parquet"
    
    dest_dir = Path(base_dir) / f"year={year}" / f"month={month}" / f"day={day}"
    dest_dir.mkdir(parents=True, exist_ok=True)
    
    # Write to temp file first
    tmp_path = dest_dir / (filename + ".tmp")
    final_path = dest_dir / filename
    
    df = pd.DataFrame(rows)
    df.to_parquet(tmp_path, index=False, engine="pyarrow")
    
    # Atomic rename
    tmp_path.rename(final_path)

5.3 Schema Consistency Check

Each snapshot should write a schema marker file once:

# After first successful write per partition, write schema.parquet as a reference
schema_path = dest_dir / "_schema.parquet"
if not schema_path.exists():
    df.iloc[:0].to_parquet(schema_path)  # empty DataFrame with same schema

This allows downstream readers to discover the schema without reading a full snapshot.


6. Data Validation During Collection

6.1 Row-Level Rejection Rules

Reject (skip, don't crash) individual ads if:

Condition Why Action
price is None or ≤ 0 Bad data Log warning, skip
surplusAmount is None or ≤ 0 Ad has no USDT left Log debug, skip
monthFinishRate is 0.0 and monthOrderCount > 0 Merchant hasn't completed any orders (suspicious) Log warning, skip
price < 1.0 or price > 500.0 Way outside VES/USDT normal range (should be ~50150) Log warning, skip this ad
Empty advNo Missing identifier Log error, skip
Duplicate advNo within same snapshot Possible API glitch Log warning, keep first occurrence

6.2 Snapshot-Level Validation

After collecting all ads for one snapshot:

✅ TOTAL ADS:         BUY=47  SELL=53  (should be 20-200 each)
✅ PRICE RANGE:       BUY  [54.20 - 62.80]  SELL [58.00 - 68.50]
   (SELL should be consistently higher than BUY)
   If not: LOG WARNING "BUY/SELL overlap detected"
✅ SPREAD:            SELL_min - BUY_max = 58.00 - 62.80 = -4.80
   (If negative: spread is inverted — unusual but possible)
   Log: "Current spread: {spread:.2f} VES/USDT"
✅ MEDIAN PRICE:      BUY=58.30  SELL=63.50
✅ AD STALENESS:      0 ads with createTime > 7 days old
   (If any: they're stale, still keep them, but log it)
✅ EMPTY SNAPSHOT:    If BUY=0 AND SELL=0 → CRITICAL ALERT

6.3 Snapshot Summary Log Line (one line per snapshot)

2026-06-05 13:30:00 UTC | BUY=47 ads [54.2062.80] SELL=53 ads [58.0068.50] | spread= -4.80 | took 3.2s | methods=[BANESCO,PAGO_MOVIL,MERCANTIL,...]

7. Scheduling & Lifecycle

7.1 Startup Behavior

1. Read checkpoint.json (if exists)
   → "last_completed_snapshot": "2026-06-05T13:25:00Z"
   → Wait until (last_completed + interval) before starting
   → If checkpoint is missing or corrupted, start immediately

2. Verify data directory is writable
   → Try writing a test file, then delete it

3. Log: "Starting collector. Interval=300s. Pairs=USDT/VES"

7.2 Graceful Shutdown

import signal

running = True

def handle_signal(sig, frame):
    global running
    logging.info("Received signal %s, finishing current snapshot...", sig)
    running = False

signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)

# In main loop:
while running:
    # ... do snapshot ...
    # Write checkpoint after each successful snapshot
    write_checkpoint({"last_completed_snapshot": now_utc.isoformat()})

7.3 Checkpoint File Format

{
    "last_completed_snapshot": "2026-06-05T13:30:00Z",
    "last_buy_ad_count": 47,
    "last_sell_ad_count": 53,
    "consecutive_failures": 0,
    "total_snapshots": 284,
    "first_snapshot": "2026-06-01T00:00:00Z",
    "version": "1.0"
}

7.4 Alert Marker File

After 5 consecutive failures, write:

/path/to/data/alerts/20260605_133000_5_failures.alert

Content:

{
    "timestamp": "2026-06-05T13:30:00Z",
    "error": "HTTP 500 after 3 retries",
    "consecutive_failures": 5,
    "traceback": "..."
}

8. First-Run Verification Protocol

After the collector writes its first snapshot, the coder should manually verify:

Step 1: Read the Parquet file back

import pandas as pd
df = pd.read_parquet("data/raw/buy_ads/year=2026/month=06/day=05/snapshot_20260605_133000.parquet")
df.info()
df.head()

Check:

  • All columns present (23 columns from spec)
  • No null values in critical fields (price, adv_no, advertiser_no)
  • price is float type, not string
  • fetched_at is datetime type
  • payment_methods is a proper list column

Step 2: Verify BUY vs SELL logic

buy_ads = df[df["trade_type"] == "BUY"]
sell_ads = df[df["trade_type"] == "SELL"]

print(f"BUY ads count:  {len(buy_ads)}")
print(f"SELL ads count: {len(sell_ads)}")
print(f"BUY price range:  {buy_ads['price'].min():.2f} - {buy_ads['price'].max():.2f}")
print(f"SELL price range: {sell_ads['price'].min():.2f} - {sell_ads['price'].max():.2f}")

Expected: SELL prices are higher than BUY prices (advertiser selling USDT charges a premium vs. buying USDT).

Step 3: Verify payment methods are captured

all_methods = set()
for methods in df["payment_methods"]:
    all_methods.update(methods)
print(f"Payment methods found: {sorted(all_methods)}")

Expected: At least BANESCO and PAGO_MOVIL will appear. Possibly 515 different banks.

Step 4: Verify advertiser diversity

print(f"Unique advertisers: {df['advertiser_no'].nunique()}")
print(f"Merchants: {(df['advertiser_type'] == 'merchant').sum()}")
print(f"Users: {(df['advertiser_type'] == 'user').sum()}")

Step 5: Run the collector for 1 hour (~12 snapshots) and verify:

ls data/raw/buy_ads/year=2026/month=06/day=05/ | wc -l
# Should be ~12
  • No duplicate timestamps
  • No gaps > 6 minutes
  • No crash/restart in the logs

9. File & Module Structure (Exact)

p2p-collector/
├── collect_p2p.py           # Entry point: argument parsing, main loop
├── config.yaml              # All configurable settings
├── binance_client.py        # fetch_all_ads(), pagination, rate limiting
├── normalizer.py            # normalize_ad(), flatten schema
├── storage.py               # store_parquet(), atomic writes, checkpoint
├── validator.py             # validate_row(), validate_snapshot()
├── scheduler.py             # main loop, sleep/jitter, signal handling
├── alert.py                 # write_alert_file(), logging setup
├── utils.py                 # jitter(), datetime helpers
├── requirements.txt         # pinned versions
├── Makefile                 # setup, run, clean, test commands
├── tests/
│   ├── test_normalizer.py   # Test with sample API response
│   ├── test_storage.py      # Test atomic writes
│   └── test_validator.py    # Test rejection rules
├── sample_responses/
│   ├── response_buy.json    # One real-ish API response for tests
│   └── response_sell.json
└── README.md                # Run instructions

requirements.txt

httpx>=0.27,<1.0
pandas>=2.0,<3.0
pyarrow>=14.0,<16.0
pyyaml>=6.0,<7.0

Note: httpx over requests because it has native timeout support, cleaner API. Fall back to requests if the coder prefers.


10. config.yaml — Complete Reference

binance:
  base_url: "https://p2p.binance.com/bapi/c2c/v2/friendly/c2c/adv/search"
  user_agent: "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
  timeout_seconds: 15
  max_pages: 10
  request_delay_seconds: 0.5

collection:
  pairs:
    - asset: "USDT"
      fiat: "VES"
  interval_seconds: 300
  output_dir: "./data/raw"
  retry_attempts: 3
  retry_delay_base_seconds: 10

validation:
  price_min: 1.0
  price_max: 500.0
  reject_zero_finish_rate: true
  reject_zero_surplus: true

logging:
  level: "INFO"
  file: "./data/logs/collector.log"
  max_bytes: 10485760  # 10 MB
  backup_count: 5
  format: "%(asctime)s | %(levelname)s | %(message)s"

alerts:
  consecutive_failure_threshold: 5
  alert_dir: "./data/alerts"

11. Run Modes

Mode 1: One-shot test

python collect_p2p.py --once
  • Fetches one BUY snapshot + one SELL snapshot
  • Writes to disk
  • Prints summary
  • Exits
  • Used for: first run, testing, debugging

Mode 2: Daemon (continuous)

python collect_p2p.py
  • Runs forever
  • Loop with interval
  • Graceful shutdown on Ctrl+C

Mode 3: Backfill (future)

python collect_p2p.py --backfill --start=2026-06-01 --end=2026-06-03
  • Not needed now
  • Architecture supports it later

Mode 4: Validate-only

python collect_p2p.py --validate data/raw/buy_ads/year=2026/month=06/day=05/
  • Reads Parquet files
  • Runs validation checks
  • Prints report
  • No API calls

12. Testing the Coder's Work

Hand this checklist to the coder when they say "it's done":

# Test How
1 API connectivity python collect_p2p.py --once returns ads without error
2 Pagination works Inspect: total ads fetched vs total field from API
3 Both BUY and SELL Both directories have at least one file after --once
4 Schema correct pd.read_parquet(file) → 23 columns, correct dtypes
5 Payment methods populated At least 3 payment methods in the first snapshot
6 Atomic write Kill the process mid-write (SIGKILL), no partial files remain. Only .tmp files
7 Graceful shutdown Ctrl+C during a snapshot → clean exit, last snapshot saved
8 Restart resilience Start collector, kill it, restart → resumes without duplicate timestamps
9 Rate limiting No HTTP 429 in logs after 1 hour of continuous running
10 Storage efficiency 1 hour of data ≤ 3 MB total on disk

13. Post-Collection — What the Data Will Look Like After One Week

Metric Expected value
Snapshots collected ~2,016 (7 days × 288 snapshots/day)
Total raw ads ~200,000400,000 rows
Storage used ~20100 MB
Unique advertisers 100500
Unique payment methods 1020
Price range (BUY) ~5565 VES/USDT (fluctuates with parallel dollar)
Price range (SELL) ~5870 VES/USDT
Typical spread ~26 VES/USDT (310%)

After 1 week of collection, we stop and do EDA before any ML decisions.


14. Known Gotchas / FAQ for the Coder

Q: What if the API returns different fields than documented? A: The normalizer should use .get() with defaults for every field. Log a warning if a field is missing that we expected. Don't crash.

Q: What if tradeMethods is empty? A: Some ads have no payment methods listed. Store as empty list []. Continue. This is valid data.

Q: What timezone should I use? A: Everything in UTC. The user is in VET (UTC-4), but all stored timestamps are UTC. Timezone conversion is only for display.

Q: What if the VPS reboots? A: systemd Restart=always handles this. The collector reads the last checkpoint and continues after the appropriate delay.

Q: Should I use asyncio? A: No. Simple synchronous code. The delay between requests (5 minutes) means async provides zero benefit and adds complexity.

Q: Can I use SQLite instead of Parquet? A: You could, but Parquet is more storage-efficient and directly loadable into ML frameworks (Pandas, Polars, PyTorch). Stick with Parquet.


End of data collection spec. Hand this to the coding agent as the single source of truth.