Set up continuous P2P VES/USDT market history data collection, normalization, validation, and date-partitioned Parquet storage.
107 lines
4.9 KiB
Python
107 lines
4.9 KiB
Python
import unittest
|
|
import json
|
|
import os
|
|
from datetime import datetime, timezone
|
|
|
|
# Add the parent folder to path to import normalizer
|
|
import sys
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from normalizer import normalize_ad
|
|
|
|
class TestNormalizer(unittest.TestCase):
|
|
def setUp(self):
|
|
# Paths to sample responses
|
|
current_dir = os.path.dirname(os.path.abspath(__file__))
|
|
self.buy_json_path = os.path.join(current_dir, "..", "sample_responses", "response_buy.json")
|
|
self.sell_json_path = os.path.join(current_dir, "..", "sample_responses", "response_sell.json")
|
|
|
|
def test_normalize_buy_ad(self):
|
|
with open(self.buy_json_path, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
raw_ad = data["data"][0]
|
|
fetched_at = datetime(2026, 6, 5, 13, 30, 0, tzinfo=timezone.utc)
|
|
|
|
normalized = normalize_ad(raw_ad, "BUY", fetched_at)
|
|
|
|
# Verify schema keys
|
|
expected_keys = {
|
|
"snapshot_id", "fetched_at", "fetched_date", "trade_type", "adv_no",
|
|
"asset", "fiat", "price", "surplus_amount", "min_amount", "max_amount",
|
|
"tradable_quantity", "advertiser_no", "advertiser_name", "advertiser_type",
|
|
"month_order_count", "month_finish_rate", "positive_rate", "user_positive_rate",
|
|
"payment_methods", "payment_method_ids", "ad_created_at", "price_type"
|
|
}
|
|
self.assertEqual(set(normalized.keys()), expected_keys)
|
|
|
|
# Verify content mapping
|
|
self.assertEqual(normalized["snapshot_id"], "20260605T133000Z_BUY")
|
|
self.assertEqual(normalized["fetched_at"], fetched_at)
|
|
self.assertEqual(normalized["fetched_date"], "2026-06-05")
|
|
self.assertEqual(normalized["trade_type"], "BUY")
|
|
self.assertEqual(normalized["adv_no"], "6f8b2e12345")
|
|
self.assertEqual(normalized["asset"], "USDT")
|
|
self.assertEqual(normalized["fiat"], "VES")
|
|
self.assertEqual(normalized["price"], 58.50)
|
|
self.assertEqual(normalized["surplus_amount"], 1520.43)
|
|
self.assertEqual(normalized["min_amount"], 100.0)
|
|
self.assertEqual(normalized["max_amount"], 5000.0)
|
|
self.assertEqual(normalized["tradable_quantity"], 1520.43)
|
|
self.assertEqual(normalized["advertiser_no"], "ABC123456")
|
|
self.assertEqual(normalized["advertiser_name"], "CryptoTraderVE")
|
|
self.assertEqual(normalized["advertiser_type"], "merchant")
|
|
self.assertEqual(normalized["month_order_count"], 342)
|
|
self.assertEqual(normalized["month_finish_rate"], 0.97)
|
|
self.assertEqual(normalized["positive_rate"], 0.99)
|
|
self.assertEqual(normalized["user_positive_rate"], 0.99)
|
|
self.assertEqual(normalized["payment_methods"], ["BANESCO", "PAGO_MOVIL"])
|
|
self.assertEqual(normalized["payment_method_ids"], ["Banco_Banesco", "Pago_Movil"])
|
|
self.assertEqual(normalized["price_type"], "FIXED")
|
|
|
|
# Verify ad creation time parsed from 1749128400000ms
|
|
expected_create_time = datetime.fromtimestamp(1749128400000 / 1000, tz=timezone.utc)
|
|
self.assertEqual(normalized["ad_created_at"], expected_create_time)
|
|
|
|
def test_normalize_sell_ad(self):
|
|
with open(self.sell_json_path, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
raw_ad = data["data"][0]
|
|
fetched_at = datetime(2026, 6, 5, 13, 30, 0, tzinfo=timezone.utc)
|
|
|
|
normalized = normalize_ad(raw_ad, "SELL", fetched_at)
|
|
|
|
self.assertEqual(normalized["snapshot_id"], "20260605T133000Z_SELL")
|
|
self.assertEqual(normalized["trade_type"], "SELL")
|
|
self.assertEqual(normalized["adv_no"], "7a9c3d98765")
|
|
self.assertEqual(normalized["price"], 62.30)
|
|
self.assertEqual(normalized["payment_methods"], ["MERCANTIL", "PAGO_MOVIL"])
|
|
self.assertEqual(normalized["payment_method_ids"], ["Banco_Mercantil", "Pago_Movil"])
|
|
|
|
def test_defensive_handling(self):
|
|
# Test handling missing or corrupted keys
|
|
bad_raw_ad = {
|
|
"adv": {
|
|
"advNo": "bad_ad",
|
|
"price": "not_a_float",
|
|
"surplusAmount": None
|
|
},
|
|
"advertiser": {
|
|
"userNo": "bad_advertiser",
|
|
"monthOrderCount": "not_an_int"
|
|
}
|
|
}
|
|
|
|
fetched_at = datetime.now(timezone.utc)
|
|
normalized = normalize_ad(bad_raw_ad, "BUY", fetched_at)
|
|
|
|
# Should not crash, should fall back to defaults
|
|
self.assertEqual(normalized["adv_no"], "bad_ad")
|
|
self.assertEqual(normalized["price"], 0.0) # fallback
|
|
self.assertEqual(normalized["surplus_amount"], 0.0) # fallback
|
|
self.assertEqual(normalized["month_order_count"], 0) # fallback
|
|
self.assertEqual(normalized["advertiser_no"], "bad_advertiser")
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|