Set up continuous P2P VES/USDT market history data collection, normalization, validation, and date-partitioned Parquet storage. |
||
|---|---|---|
| .. | ||
| sample_responses | ||
| tests | ||
| alert.py | ||
| binance_client.py | ||
| collect_p2p.py | ||
| config.yaml | ||
| Makefile | ||
| normalizer.py | ||
| README.md | ||
| requirements.txt | ||
| scheduler.py | ||
| storage.py | ||
| utils.py | ||
| validator.py | ||
Binance P2P Data Collector
This tool continuously collects public peer-to-peer (P2P) market advertisements from Binance P2P for Venezuela (VES/USDT), normalizing, validating, and saving them as atomic date-partitioned Parquet files for subsequent exploratory data analysis and arbitrage modeling.
Project Structure
p2p-collector/
├── collect_p2p.py # Entry point: argument parsing, validation/daemon modes
├── config.yaml # Application configuration (endpoints, delays, validation limits)
├── binance_client.py # HTTP client, pagination logic, retry, and 429 backoff
├── normalizer.py # Converts raw nested API responses into a flat 23-column schema
├── validator.py # Row-level filtering and snapshot-level integrity checks
├── storage.py # Atomic Parquet writes, schema references, and checkpoints
├── scheduler.py # Loop executor, initial start offsets, signal handling
├── alert.py # Write alert marker files on 5 consecutive failures & logger setup
├── utils.py # Time and sleep/jitter helpers
├── requirements.txt # Package dependencies (httpx, pandas, pyarrow, pyyaml)
├── Makefile # Automation targets (setup, test, run, clean)
└── tests/ # Suite of unit tests for all components
Prerequisites
- Python 3.8+ (Developed and tested with Python 3.14)
- Make (utility for running Makefile targets)
Installation & Setup
Set up the Python virtual environment and install all dependencies:
make setup
Running the Collector
Mode 1: Continuous Daemon Mode
Runs indefinitely, fetching snapshots according to the configured interval (default: 5 minutes) with a ±10% sleep jitter to prevent pattern recognition. Handles graceful shutdown on SIGINT/SIGTERM.
make run
Mode 2: One-shot Mode (Test/Debug)
Runs exactly one cycle (one BUY snapshot and one SELL snapshot), writes the results to disk, and exits immediately:
make run-once
Mode 3: Validate-Only Mode
Validates existing Parquet files without making any network calls. It prints statistics (row count, min/max prices, payment methods) and checks for critical schema issues:
make validate PATH_TO_VALIDATE=data/raw/buy_ads/year=2026/month=06/day=05/
Running Tests
Run the test suite to verify the client, normalizer, storage, and validation behaviors:
make test
Output Directory Structure
The data is saved under ./data/ folder inside the project root:
data/
├── raw/
│ ├── buy_ads/
│ │ └── year=YYYY/month=MM/day=DD/
│ │ ├── _schema.parquet # Empty schema reference
│ │ └── snapshot_YYYYMMDD_HHMMSS.parquet # Atomic snapshot data
│ └── sell_ads/
│ └── year=YYYY/month=MM/day=DD/
│ ├── _schema.parquet
│ └── snapshot_YYYYMMDD_HHMMSS.parquet
├── logs/
│ └── collector.log # Rotating logs
├── alerts/
│ └── YYYYMMDD_HHMMSS_5_failures.alert # Alert marker JSON file (only on failures)
└── checkpoint.json # Restart resilience marker
Checkpoint Format
A checkpoint file is updated on every successful snapshot, ensuring that restarting the daemon will not query the API until the expected interval has passed:
{
"last_completed_snapshot": "2026-06-05T13:30:00Z",
"last_buy_ad_count": 47,
"last_sell_ad_count": 53,
"consecutive_failures": 0,
"total_snapshots": 284,
"first_snapshot": "2026-06-01T00:00:00Z",
"version": "1.0"
}