binance-p2p-market-history/p2p-collector/storage.py
Gabriel Ramos 2c41a7a6b3 feat: implement binance p2p collector daemon
Set up continuous P2P VES/USDT market history data collection, normalization, validation, and date-partitioned Parquet storage.
2026-06-05 14:40:05 -04:00

102 lines
3.5 KiB
Python

import os
import json
import logging
from pathlib import Path
from datetime import datetime
import pandas as pd
logger = logging.getLogger(__name__)
def store_parquet(rows: list, base_dir: str, fetched_at: datetime) -> str:
"""
Stores rows of ads to a Parquet file atomically in a date-partitioned directory.
Also writes an empty _schema.parquet file for the schema reference if it doesn't exist.
Returns the path to the written final Parquet file.
"""
if not rows:
logger.warning("No rows provided to store_parquet.")
return ""
year = fetched_at.strftime("%Y")
month = fetched_at.strftime("%m")
day = fetched_at.strftime("%d")
filename = f"snapshot_{fetched_at.strftime('%Y%m%d_%H%M%S')}.parquet"
dest_dir = Path(base_dir) / f"year={year}" / f"month={month}" / f"day={day}"
dest_dir.mkdir(parents=True, exist_ok=True)
tmp_path = dest_dir / (filename + ".tmp")
final_path = dest_dir / filename
try:
df = pd.DataFrame(rows)
# Sort columns to ensure consistent schema layout
df = df.reindex(sorted(df.columns), axis=1)
# Write atomically using a temporary file
df.to_parquet(tmp_path, index=False, engine="pyarrow")
tmp_path.rename(final_path)
logger.info(f"Successfully stored snapshot to {final_path}")
# Schema consistency reference file
schema_path = dest_dir / "_schema.parquet"
if not schema_path.exists():
try:
# Write an empty dataframe with identical columns and schema
df.iloc[:0].to_parquet(schema_path, index=False, engine="pyarrow")
logger.info(f"Created schema reference file at {schema_path}")
except Exception as e:
logger.error(f"Failed to create schema reference file: {e}")
return str(final_path)
except Exception as e:
logger.error(f"Failed to write parquet file {final_path}: {e}")
# Clean up tmp file if it exists
if tmp_path.exists():
try:
tmp_path.unlink()
except Exception as cleanup_error:
logger.error(f"Failed to delete temp file {tmp_path}: {cleanup_error}")
raise e
def read_checkpoint(path: str) -> dict:
"""
Reads the checkpoint JSON file if it exists.
Returns a dictionary, or an empty dict if the file is missing or corrupted.
"""
if not os.path.exists(path):
logger.info(f"Checkpoint file {path} not found. Starting fresh.")
return {}
try:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
logger.info(f"Successfully read checkpoint from {path}")
return data
except Exception as e:
logger.warning(f"Checkpoint file {path} exists but is corrupted: {e}. Starting fresh.")
return {}
def write_checkpoint(path: str, data: dict) -> None:
"""
Writes the checkpoint dictionary to the specified JSON file path.
Uses atomic write to prevent corruption.
"""
dir_path = os.path.dirname(path)
if dir_path:
os.makedirs(dir_path, exist_ok=True)
tmp_path = f"{path}.tmp"
try:
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4)
os.replace(tmp_path, path)
logger.debug(f"Checkpoint updated at {path}")
except Exception as e:
logger.error(f"Failed to write checkpoint to {path}: {e}")
if os.path.exists(tmp_path):
try:
os.remove(tmp_path)
except Exception:
pass