| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- """Background protector: periodically checks protector_list entries and blocks offending dst_addr via advanced_acl.add_ip
- """
- from __future__ import annotations
- import threading
- import time
- from typing import Optional
- import advanced_acl
- from auth_session import load_config
- import protector_list
- from analysis_connections import find_high_connections_for_src_ports
- from log_util import get_logger
- logger = get_logger("protector")
- class ProtectorRunner:
- def __init__(self):
- cfg = load_config()
- self.interval = int(cfg.get("scan_interval", 60))
- self._stop = threading.Event()
- self.last_run_time: Optional[float] = None
- # pause control: _pause_event is set when running, cleared when paused
- self._pause_event = threading.Event()
- self._pause_event.set()
- def run_once(self):
- # if paused, skip immediate execution (protect against external triggers)
- try:
- if not self._pause_event.is_set():
- logger.info("Protector is paused, skipping run_once")
- return
- except Exception:
- pass
- # record start time to help external coordinators avoid duplicate runs
- try:
- self.last_run_time = time.time()
- except Exception:
- pass
- items = protector_list.load_list()
- for entry in items:
- target = entry.get("target_ip")
- src_port = int(entry.get("src_port"))
- threshold = entry.get("threshold")
- try:
- res = find_high_connections_for_src_ports(target, src_port, threshold=threshold)
- except Exception as e:
- logger.exception(f"检查失败 {target}:{src_port} - {e}")
- continue
- matches = res.get("by_src_port", {}).get(int(src_port), [])
- if not matches:
- logger.info(f"{target}:{src_port} - 无异常连接")
- continue
- for m in matches:
- dst = m.get("dst_addr")
- # Call advanced_acl.add_ip to block
- try:
- # Do not pass custom comment here so add_ip will place the IP into Test_* groups
- # (comment was causing new rules to be named AutoBlock_..., not grouped).
- result = advanced_acl.add_ip(dst)
- # Support both legacy (resp, data) tuple and new dict result
- if isinstance(result, dict):
- added = result.get("added")
- rule = result.get("rule")
- row_id = result.get("row_id")
- msg = result.get("message")
- logger.info(f"已尝试阻断 {dst}, added={added}, rule={rule}, row_id={row_id}, msg={msg}")
- else:
- # assume (resp, data)
- resp, data = result
- logger.info(f"已尝试阻断 {dst}, 状态: {resp.status_code}, 返回: {data}")
- except Exception as e:
- logger.exception(f"阻断失败 {dst}: {e}")
- def start(self):
- self._stop.clear()
- def _loop():
- while not self._stop.is_set():
- # respect pause state
- if not self._pause_event.is_set():
- # paused: wait until unpaused or stopped
- # wake every 1s to check stop flag
- self._pause_event.wait(timeout=1)
- continue
- try:
- self.run_once()
- except Exception:
- logger.exception("运行一次保护检查失败")
- # reload interval in case config changed
- try:
- cfg = load_config()
- self.interval = int(cfg.get("scan_interval", self.interval))
- except Exception:
- pass
- # sleep in one-second steps so we can be responsive to pause/stop
- slept = 0
- while slept < self.interval and not self._stop.is_set():
- if not self._pause_event.is_set():
- break
- time.sleep(1)
- slept += 1
- t = threading.Thread(target=_loop, daemon=True)
- t.start()
- return t
- def stop(self):
- self._stop.set()
- def get_interval(self) -> int:
- """Return current scan interval in seconds."""
- try:
- cfg = load_config()
- return int(cfg.get("scan_interval", self.interval))
- except Exception:
- return self.interval
- def get_next_run_time(self) -> float | None:
- """Return the epoch timestamp of the next scheduled run, or None if unknown."""
- try:
- interval = self.get_interval()
- if self.last_run_time:
- return float(self.last_run_time + interval)
- # if never run, next run is interval seconds from now (if started)
- return float(time.time() + interval)
- except Exception:
- return None
- def pause(self):
- """Pause periodic execution."""
- try:
- self._pause_event.clear()
- logger.info("Protector 已暂停")
- except Exception:
- pass
- def resume(self):
- """Resume periodic execution."""
- try:
- self._pause_event.set()
- logger.info("Protector 已恢复")
- except Exception:
- pass
- def is_paused(self) -> bool:
- return not self._pause_event.is_set()
- def run_protector_blocking_once():
- pr = ProtectorRunner()
- pr.run_once()
- if __name__ == "__main__":
- r = ProtectorRunner()
- r.start()
- try:
- while True:
- time.sleep(1)
- except KeyboardInterrupt:
- r.stop()
|