"""Background protector: periodically checks protector_list entries and blocks offending dst_addr via advanced_acl.add_ip """ from __future__ import annotations import threading import time from typing import Optional import advanced_acl from auth_session import load_config import protector_list from analysis_connections import find_high_connections_for_src_ports from log_util import get_logger logger = get_logger("protector") class ProtectorRunner: def __init__(self): cfg = load_config() self.interval = int(cfg.get("scan_interval", 60)) self._stop = threading.Event() self.last_run_time: Optional[float] = None def run_once(self): # record start time to help external coordinators avoid duplicate runs try: self.last_run_time = time.time() except Exception: pass items = protector_list.load_list() for entry in items: target = entry.get("target_ip") src_port = int(entry.get("src_port")) threshold = entry.get("threshold") try: res = find_high_connections_for_src_ports(target, src_port, threshold=threshold) except Exception as e: logger.exception(f"检查失败 {target}:{src_port} - {e}") continue matches = res.get("by_src_port", {}).get(int(src_port), []) if not matches: logger.info(f"{target}:{src_port} - 无异常连接") continue for m in matches: dst = m.get("dst_addr") # Call advanced_acl.add_ip to block try: # Do not pass custom comment here so add_ip will place the IP into Test_* groups # (comment was causing new rules to be named AutoBlock_..., not grouped). result = advanced_acl.add_ip(dst) # Support both legacy (resp, data) tuple and new dict result if isinstance(result, dict): added = result.get("added") rule = result.get("rule") row_id = result.get("row_id") msg = result.get("message") logger.info(f"已尝试阻断 {dst}, added={added}, rule={rule}, row_id={row_id}, msg={msg}") else: # assume (resp, data) resp, data = result logger.info(f"已尝试阻断 {dst}, 状态: {resp.status_code}, 返回: {data}") except Exception as e: logger.exception(f"阻断失败 {dst}: {e}") def start(self): self._stop.clear() def _loop(): while not self._stop.is_set(): try: self.run_once() except Exception: logger.exception("运行一次保护检查失败") # reload interval in case config changed try: cfg = load_config() self.interval = int(cfg.get("scan_interval", self.interval)) except Exception: pass time.sleep(self.interval) t = threading.Thread(target=_loop, daemon=True) t.start() return t def stop(self): self._stop.set() def get_interval(self) -> int: """Return current scan interval in seconds.""" try: cfg = load_config() return int(cfg.get("scan_interval", self.interval)) except Exception: return self.interval def run_protector_blocking_once(): pr = ProtectorRunner() pr.run_once() if __name__ == "__main__": r = ProtectorRunner() r.start() try: while True: time.sleep(1) except KeyboardInterrupt: r.stop()