Set up continuous P2P VES/USDT market history data collection, normalization, validation, and date-partitioned Parquet storage.
80 lines
2.8 KiB
Python
80 lines
2.8 KiB
Python
import os
|
|
import json
|
|
import logging
|
|
from logging.handlers import RotatingFileHandler
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def setup_logging(config: dict) -> None:
|
|
"""
|
|
Configures the logger using parameters from the config dictionary.
|
|
Sets up both a console handler and a rotating file handler.
|
|
"""
|
|
log_config = config.get("logging", {})
|
|
level_str = log_config.get("level", "INFO")
|
|
level = getattr(logging, level_str.upper(), logging.INFO)
|
|
|
|
log_file = log_config.get("file", "./data/logs/collector.log")
|
|
max_bytes = log_config.get("max_bytes", 10485760)
|
|
backup_count = log_config.get("backup_count", 5)
|
|
log_format = log_config.get("format", "%(asctime)s | %(levelname)s | %(message)s")
|
|
|
|
# Ensure log directory exists
|
|
log_dir = os.path.dirname(log_file)
|
|
if log_dir:
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
|
|
# Set up root logger
|
|
root_logger = logging.getLogger()
|
|
root_logger.setLevel(level)
|
|
|
|
# Remove existing handlers to avoid duplicates on re-setup
|
|
for handler in list(root_logger.handlers):
|
|
root_logger.removeHandler(handler)
|
|
|
|
# Console handler
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setFormatter(logging.Formatter(log_format))
|
|
root_logger.addHandler(console_handler)
|
|
|
|
# Rotating file handler
|
|
try:
|
|
file_handler = RotatingFileHandler(
|
|
log_file, maxBytes=max_bytes, backupCount=backup_count, encoding="utf-8"
|
|
)
|
|
file_handler.setFormatter(logging.Formatter(log_format))
|
|
root_logger.addHandler(file_handler)
|
|
logger.info(f"Logging configured successfully. Writing to {log_file}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to set up file logging: {e}")
|
|
|
|
|
|
def write_alert_file(config: dict, timestamp: datetime, error_msg: str, consecutive_failures: int, traceback_str: str) -> str:
|
|
"""
|
|
Writes an alert marker JSON file to the configured alert directory.
|
|
Returns the path of the created alert file.
|
|
"""
|
|
alerts_config = config.get("alerts", {})
|
|
alert_dir = alerts_config.get("alert_dir", "./data/alerts")
|
|
os.makedirs(alert_dir, exist_ok=True)
|
|
|
|
time_str = timestamp.strftime("%Y%m%d_%H%M%S")
|
|
filename = f"{time_str}_{consecutive_failures}_failures.alert"
|
|
alert_path = os.path.join(alert_dir, filename)
|
|
|
|
content = {
|
|
"timestamp": timestamp.isoformat(),
|
|
"error": error_msg,
|
|
"consecutive_failures": consecutive_failures,
|
|
"traceback": traceback_str
|
|
}
|
|
|
|
try:
|
|
with open(alert_path, "w", encoding="utf-8") as f:
|
|
json.dump(content, f, indent=4)
|
|
logger.error(f"ALERT WRITTEN: {alert_path}")
|
|
return alert_path
|
|
except Exception as e:
|
|
logger.critical(f"Failed to write alert file at {alert_path}: {e}")
|
|
return ""
|