Set up continuous P2P VES/USDT market history data collection, normalization, validation, and date-partitioned Parquet storage.
72 lines
2.5 KiB
Python
72 lines
2.5 KiB
Python
import unittest
|
|
import os
|
|
import tempfile
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
import pandas as pd
|
|
|
|
import sys
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from storage import store_parquet, read_checkpoint, write_checkpoint
|
|
|
|
class TestStorage(unittest.TestCase):
|
|
def setUp(self):
|
|
self.temp_dir = tempfile.TemporaryDirectory()
|
|
self.base_dir = self.temp_dir.name
|
|
|
|
def tearDown(self):
|
|
self.temp_dir.cleanup()
|
|
|
|
def test_store_parquet(self):
|
|
fetched_at = datetime(2026, 6, 5, 13, 30, 0, tzinfo=timezone.utc)
|
|
rows = [
|
|
{"col1": "val1", "col2": 1.5, "col3": True},
|
|
{"col1": "val2", "col2": 2.5, "col3": False}
|
|
]
|
|
|
|
# Write
|
|
final_path = store_parquet(rows, self.base_dir, fetched_at)
|
|
|
|
# Verify path exists
|
|
self.assertTrue(os.path.exists(final_path))
|
|
|
|
# Verify partition path structure: year=2026/month=06/day=05/snapshot_20260605_133000.parquet
|
|
expected_subdir = os.path.join(self.base_dir, "year=2026", "month=06", "day=05")
|
|
self.assertTrue(final_path.startswith(expected_subdir))
|
|
self.assertTrue(final_path.endswith("snapshot_20260605_133000.parquet"))
|
|
|
|
# Verify schema file exists
|
|
schema_path = os.path.join(expected_subdir, "_schema.parquet")
|
|
self.assertTrue(os.path.exists(schema_path))
|
|
|
|
# Read schema back and verify it's empty but has columns
|
|
df_schema = pd.read_parquet(schema_path)
|
|
self.assertEqual(len(df_schema), 0)
|
|
self.assertEqual(list(df_schema.columns), sorted(["col1", "col2", "col3"]))
|
|
|
|
# Read data back and verify content
|
|
df_data = pd.read_parquet(final_path)
|
|
self.assertEqual(len(df_data), 2)
|
|
self.assertEqual(df_data.iloc[0]["col1"], "val1")
|
|
self.assertEqual(df_data.iloc[1]["col2"], 2.5)
|
|
|
|
def test_checkpoint(self):
|
|
checkpoint_path = os.path.join(self.base_dir, "checkpoint.json")
|
|
|
|
# Test missing checkpoint
|
|
self.assertEqual(read_checkpoint(checkpoint_path), {})
|
|
|
|
# Test write and read
|
|
data = {
|
|
"last_completed_snapshot": "2026-06-05T13:30:00Z",
|
|
"last_buy_ad_count": 47,
|
|
"last_sell_ad_count": 53
|
|
}
|
|
write_checkpoint(checkpoint_path, data)
|
|
|
|
read_data = read_checkpoint(checkpoint_path)
|
|
self.assertEqual(read_data, data)
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|