log_util.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. import logging
  2. from pathlib import Path
  3. import json
  4. from datetime import datetime, timedelta
  5. import glob
  6. import os
  7. import sys
  8. def _get_app_dir() -> Path:
  9. try:
  10. if getattr(sys, "frozen", False):
  11. return Path(sys.executable).resolve().parent
  12. except Exception:
  13. pass
  14. try:
  15. return Path.cwd()
  16. except Exception:
  17. return Path(__file__).resolve().parent
  18. LOG_DIR = _get_app_dir() / "logs"
  19. LOG_DIR.mkdir(parents=True, exist_ok=True)
  20. class DailyRotatingFileHandler(logging.Handler):
  21. """A simple daily rotating file handler that names files as {name}_YYYYMMDD.log
  22. It will open a new file when the date changes and optionally remove old log
  23. files older than backup_count days.
  24. """
  25. def __init__(self, name: str, backup_count: int = 7, encoding: str = "utf-8"):
  26. super().__init__()
  27. self.name = name
  28. self.backup_count = backup_count
  29. self.encoding = encoding
  30. self.current_date = datetime.now().strftime("%Y%m%d")
  31. self._fh = None
  32. def _open(self):
  33. filename = LOG_DIR / f"{self.name}_{self.current_date}.log"
  34. # Ensure parent exists (LOG_DIR already exists)
  35. self._fh = logging.FileHandler(filename, encoding=self.encoding)
  36. if self.formatter:
  37. self._fh.setFormatter(self.formatter)
  38. def _cleanup_old(self):
  39. # Remove files older than backup_count days matching pattern name_*.log
  40. pattern = str(LOG_DIR / f"{self.name}_*.log")
  41. files = glob.glob(pattern)
  42. cutoff = datetime.now() - timedelta(days=self.backup_count)
  43. for f in files:
  44. basename = os.path.basename(f)
  45. # Expect basename like name_YYYYMMDD.log
  46. try:
  47. parts = basename.rsplit("_", 1)
  48. if len(parts) != 2:
  49. continue
  50. date_part = parts[1].split(".")[0]
  51. file_date = datetime.strptime(date_part, "%Y%m%d")
  52. if file_date < cutoff:
  53. try:
  54. os.remove(f)
  55. except Exception:
  56. pass
  57. except Exception:
  58. # ignore parsing errors
  59. continue
  60. def emit(self, record: logging.LogRecord) -> None:
  61. now_date = datetime.now().strftime("%Y%m%d")
  62. if self._fh is None:
  63. self._open()
  64. self._cleanup_old()
  65. elif now_date != self.current_date:
  66. # Date changed: rotate
  67. try:
  68. self._fh.close()
  69. except Exception:
  70. pass
  71. self.current_date = now_date
  72. self._open()
  73. self._cleanup_old()
  74. # Delegate emission to underlying FileHandler
  75. try:
  76. if self._fh.formatter is None and self.formatter:
  77. self._fh.setFormatter(self.formatter)
  78. self._fh.emit(record)
  79. except Exception:
  80. self.handleError(record)
  81. def setFormatter(self, fmt: logging.Formatter) -> None:
  82. super().setFormatter(fmt)
  83. if self._fh:
  84. self._fh.setFormatter(fmt)
  85. def close(self) -> None:
  86. try:
  87. if self._fh:
  88. self._fh.close()
  89. finally:
  90. super().close()
  91. def _make_file_handler(name: str):
  92. handler = DailyRotatingFileHandler(name, backup_count=7, encoding="utf-8")
  93. formatter = logging.Formatter(
  94. "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
  95. datefmt="%Y-%m-%d %H:%M:%S",
  96. )
  97. handler.setFormatter(formatter)
  98. return handler
  99. def get_logger(name: str = "app") -> logging.Logger:
  100. logger = logging.getLogger(name)
  101. if logger.handlers:
  102. return logger
  103. logger.setLevel(logging.DEBUG)
  104. fh = _make_file_handler(name)
  105. logger.addHandler(fh)
  106. sh = logging.StreamHandler()
  107. sh.setFormatter(
  108. logging.Formatter("%(asctime)s | %(levelname)-7s | %(name)s | %(message)s", datefmt="%H:%M:%S")
  109. )
  110. logger.addHandler(sh)
  111. return logger
  112. def log_request(logger: logging.Logger, func_name: str, url: str, request_body, response):
  113. try:
  114. req_text = json.dumps(request_body, ensure_ascii=False)
  115. except Exception:
  116. req_text = str(request_body)
  117. # Try to capture response text or JSON
  118. resp_text = None
  119. try:
  120. resp_text = response.text
  121. except Exception:
  122. resp_text = str(response)
  123. # Use Chinese labels where appropriate, keep technical terms like URL and status code
  124. logger.info(
  125. f"调用: {func_name} | URL: {url} | 请求: {req_text} | 响应状态: {getattr(response, 'status_code', 'N/A')} | 响应: {resp_text}"
  126. )