import logging from pathlib import Path import json from datetime import datetime, timedelta import glob import os LOG_DIR = Path(__file__).resolve().parent / "logs" LOG_DIR.mkdir(parents=True, exist_ok=True) class DailyRotatingFileHandler(logging.Handler): """A simple daily rotating file handler that names files as {name}_YYYYMMDD.log It will open a new file when the date changes and optionally remove old log files older than backup_count days. """ def __init__(self, name: str, backup_count: int = 7, encoding: str = "utf-8"): super().__init__() self.name = name self.backup_count = backup_count self.encoding = encoding self.current_date = datetime.now().strftime("%Y%m%d") self._fh = None def _open(self): filename = LOG_DIR / f"{self.name}_{self.current_date}.log" # Ensure parent exists (LOG_DIR already exists) self._fh = logging.FileHandler(filename, encoding=self.encoding) if self.formatter: self._fh.setFormatter(self.formatter) def _cleanup_old(self): # Remove files older than backup_count days matching pattern name_*.log pattern = str(LOG_DIR / f"{self.name}_*.log") files = glob.glob(pattern) cutoff = datetime.now() - timedelta(days=self.backup_count) for f in files: basename = os.path.basename(f) # Expect basename like name_YYYYMMDD.log try: parts = basename.rsplit("_", 1) if len(parts) != 2: continue date_part = parts[1].split(".")[0] file_date = datetime.strptime(date_part, "%Y%m%d") if file_date < cutoff: try: os.remove(f) except Exception: pass except Exception: # ignore parsing errors continue def emit(self, record: logging.LogRecord) -> None: now_date = datetime.now().strftime("%Y%m%d") if self._fh is None: self._open() self._cleanup_old() elif now_date != self.current_date: # Date changed: rotate try: self._fh.close() except Exception: pass self.current_date = now_date self._open() self._cleanup_old() # Delegate emission to underlying FileHandler try: if self._fh.formatter is None and self.formatter: self._fh.setFormatter(self.formatter) self._fh.emit(record) except Exception: self.handleError(record) def setFormatter(self, fmt: logging.Formatter) -> None: super().setFormatter(fmt) if self._fh: self._fh.setFormatter(fmt) def close(self) -> None: try: if self._fh: self._fh.close() finally: super().close() def _make_file_handler(name: str): handler = DailyRotatingFileHandler(name, backup_count=7, encoding="utf-8") formatter = logging.Formatter( "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) handler.setFormatter(formatter) return handler def get_logger(name: str = "app") -> logging.Logger: logger = logging.getLogger(name) if logger.handlers: return logger logger.setLevel(logging.DEBUG) fh = _make_file_handler(name) logger.addHandler(fh) sh = logging.StreamHandler() sh.setFormatter( logging.Formatter("%(asctime)s | %(levelname)-7s | %(name)s | %(message)s", datefmt="%H:%M:%S") ) logger.addHandler(sh) return logger def log_request(logger: logging.Logger, func_name: str, url: str, request_body, response): try: req_text = json.dumps(request_body, ensure_ascii=False) except Exception: req_text = str(request_body) # Try to capture response text or JSON resp_text = None try: resp_text = response.text except Exception: resp_text = str(response) # Use Chinese labels where appropriate, keep technical terms like URL and status code logger.info( f"调用: {func_name} | URL: {url} | 请求: {req_text} | 响应状态: {getattr(response, 'status_code', 'N/A')} | 响应: {resp_text}" )