| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- import logging
- from pathlib import Path
- import json
- from datetime import datetime, timedelta
- import glob
- import os
- LOG_DIR = Path(__file__).resolve().parent / "logs"
- LOG_DIR.mkdir(parents=True, exist_ok=True)
- class DailyRotatingFileHandler(logging.Handler):
- """A simple daily rotating file handler that names files as {name}_YYYYMMDD.log
- It will open a new file when the date changes and optionally remove old log
- files older than backup_count days.
- """
- def __init__(self, name: str, backup_count: int = 7, encoding: str = "utf-8"):
- super().__init__()
- self.name = name
- self.backup_count = backup_count
- self.encoding = encoding
- self.current_date = datetime.now().strftime("%Y%m%d")
- self._fh = None
- def _open(self):
- filename = LOG_DIR / f"{self.name}_{self.current_date}.log"
- # Ensure parent exists (LOG_DIR already exists)
- self._fh = logging.FileHandler(filename, encoding=self.encoding)
- if self.formatter:
- self._fh.setFormatter(self.formatter)
- def _cleanup_old(self):
- # Remove files older than backup_count days matching pattern name_*.log
- pattern = str(LOG_DIR / f"{self.name}_*.log")
- files = glob.glob(pattern)
- cutoff = datetime.now() - timedelta(days=self.backup_count)
- for f in files:
- basename = os.path.basename(f)
- # Expect basename like name_YYYYMMDD.log
- try:
- parts = basename.rsplit("_", 1)
- if len(parts) != 2:
- continue
- date_part = parts[1].split(".")[0]
- file_date = datetime.strptime(date_part, "%Y%m%d")
- if file_date < cutoff:
- try:
- os.remove(f)
- except Exception:
- pass
- except Exception:
- # ignore parsing errors
- continue
- def emit(self, record: logging.LogRecord) -> None:
- now_date = datetime.now().strftime("%Y%m%d")
- if self._fh is None:
- self._open()
- self._cleanup_old()
- elif now_date != self.current_date:
- # Date changed: rotate
- try:
- self._fh.close()
- except Exception:
- pass
- self.current_date = now_date
- self._open()
- self._cleanup_old()
- # Delegate emission to underlying FileHandler
- try:
- if self._fh.formatter is None and self.formatter:
- self._fh.setFormatter(self.formatter)
- self._fh.emit(record)
- except Exception:
- self.handleError(record)
- def setFormatter(self, fmt: logging.Formatter) -> None:
- super().setFormatter(fmt)
- if self._fh:
- self._fh.setFormatter(fmt)
- def close(self) -> None:
- try:
- if self._fh:
- self._fh.close()
- finally:
- super().close()
- def _make_file_handler(name: str):
- handler = DailyRotatingFileHandler(name, backup_count=7, encoding="utf-8")
- formatter = logging.Formatter(
- "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
- datefmt="%Y-%m-%d %H:%M:%S",
- )
- handler.setFormatter(formatter)
- return handler
- def get_logger(name: str = "app") -> logging.Logger:
- logger = logging.getLogger(name)
- if logger.handlers:
- return logger
- logger.setLevel(logging.DEBUG)
- fh = _make_file_handler(name)
- logger.addHandler(fh)
- sh = logging.StreamHandler()
- sh.setFormatter(
- logging.Formatter("%(asctime)s | %(levelname)-7s | %(name)s | %(message)s", datefmt="%H:%M:%S")
- )
- logger.addHandler(sh)
- return logger
- def log_request(logger: logging.Logger, func_name: str, url: str, request_body, response):
- try:
- req_text = json.dumps(request_body, ensure_ascii=False)
- except Exception:
- req_text = str(request_body)
- # Try to capture response text or JSON
- resp_text = None
- try:
- resp_text = response.text
- except Exception:
- resp_text = str(response)
- # Use Chinese labels where appropriate, keep technical terms like URL and status code
- logger.info(
- f"调用: {func_name} | URL: {url} | 请求: {req_text} | 响应状态: {getattr(response, 'status_code', 'N/A')} | 响应: {resp_text}"
- )
|