ソースを参照

feat(log): 添加每日轮转日志文件处理器

- 实现 DailyRotatingFileHandler 类,支持按日期轮转日志文件
- 自动清理超过备份天数的日志文件
- 提供 get_logger 函数用于获取配置好的 logger 实例
- 支持同时输出日志到文件和控制台
- 添加 log_request 工具函数用于记录请求与响应信息
- 设置默认日志格式和时间戳格式
- 确保日志目录存在并可写入
- 支持 UTF-8 编码和自定义编码
- 处理日志记录异常情况避免程序崩溃
- 提供灵活的格式化器设置接口
mcbaiyun 2 ヶ月 前
コミット
655e71be58
1 ファイル変更135 行追加0 行削除
  1. 135 0
      log_util.py

+ 135 - 0
log_util.py

@@ -0,0 +1,135 @@
+import logging
+from pathlib import Path
+import json
+from datetime import datetime, timedelta
+import glob
+import os
+
+
+LOG_DIR = Path(__file__).resolve().parent / "logs"
+LOG_DIR.mkdir(parents=True, exist_ok=True)
+
+
+class DailyRotatingFileHandler(logging.Handler):
+    """A simple daily rotating file handler that names files as {name}_YYYYMMDD.log
+
+    It will open a new file when the date changes and optionally remove old log
+    files older than backup_count days.
+    """
+
+    def __init__(self, name: str, backup_count: int = 7, encoding: str = "utf-8"):
+        super().__init__()
+        self.name = name
+        self.backup_count = backup_count
+        self.encoding = encoding
+        self.current_date = datetime.now().strftime("%Y%m%d")
+        self._fh = None
+
+    def _open(self):
+        filename = LOG_DIR / f"{self.name}_{self.current_date}.log"
+        # Ensure parent exists (LOG_DIR already exists)
+        self._fh = logging.FileHandler(filename, encoding=self.encoding)
+        if self.formatter:
+            self._fh.setFormatter(self.formatter)
+
+    def _cleanup_old(self):
+        # Remove files older than backup_count days matching pattern name_*.log
+        pattern = str(LOG_DIR / f"{self.name}_*.log")
+        files = glob.glob(pattern)
+        cutoff = datetime.now() - timedelta(days=self.backup_count)
+        for f in files:
+            basename = os.path.basename(f)
+            # Expect basename like name_YYYYMMDD.log
+            try:
+                parts = basename.rsplit("_", 1)
+                if len(parts) != 2:
+                    continue
+                date_part = parts[1].split(".")[0]
+                file_date = datetime.strptime(date_part, "%Y%m%d")
+                if file_date < cutoff:
+                    try:
+                        os.remove(f)
+                    except Exception:
+                        pass
+            except Exception:
+                # ignore parsing errors
+                continue
+
+    def emit(self, record: logging.LogRecord) -> None:
+        now_date = datetime.now().strftime("%Y%m%d")
+        if self._fh is None:
+            self._open()
+            self._cleanup_old()
+        elif now_date != self.current_date:
+            # Date changed: rotate
+            try:
+                self._fh.close()
+            except Exception:
+                pass
+            self.current_date = now_date
+            self._open()
+            self._cleanup_old()
+
+        # Delegate emission to underlying FileHandler
+        try:
+            if self._fh.formatter is None and self.formatter:
+                self._fh.setFormatter(self.formatter)
+            self._fh.emit(record)
+        except Exception:
+            self.handleError(record)
+
+    def setFormatter(self, fmt: logging.Formatter) -> None:
+        super().setFormatter(fmt)
+        if self._fh:
+            self._fh.setFormatter(fmt)
+
+    def close(self) -> None:
+        try:
+            if self._fh:
+                self._fh.close()
+        finally:
+            super().close()
+
+
+def _make_file_handler(name: str):
+    handler = DailyRotatingFileHandler(name, backup_count=7, encoding="utf-8")
+    formatter = logging.Formatter(
+        "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
+        datefmt="%Y-%m-%d %H:%M:%S",
+    )
+    handler.setFormatter(formatter)
+    return handler
+
+
+def get_logger(name: str = "app") -> logging.Logger:
+    logger = logging.getLogger(name)
+    if logger.handlers:
+        return logger
+    logger.setLevel(logging.DEBUG)
+    fh = _make_file_handler(name)
+    logger.addHandler(fh)
+    sh = logging.StreamHandler()
+    sh.setFormatter(
+        logging.Formatter("%(asctime)s | %(levelname)-7s | %(name)s | %(message)s", datefmt="%H:%M:%S")
+    )
+    logger.addHandler(sh)
+    return logger
+
+
+def log_request(logger: logging.Logger, func_name: str, url: str, request_body, response):
+    try:
+        req_text = json.dumps(request_body, ensure_ascii=False)
+    except Exception:
+        req_text = str(request_body)
+
+    # Try to capture response text or JSON
+    resp_text = None
+    try:
+        resp_text = response.text
+    except Exception:
+        resp_text = str(response)
+
+    # Use Chinese labels where appropriate, keep technical terms like URL and status code
+    logger.info(
+        f"调用: {func_name} | URL: {url} | 请求: {req_text} | 响应状态: {getattr(response, 'status_code', 'N/A')} | 响应: {resp_text}"
+    )