فهرست منبع

feat(gui): 添加阻断IP管理功能

- 新增阻断IP列表框及操作按钮
- 实现刷新阻断IP列表功能
- 支持手动添加和删除阻断IP
- 在防护任务完成后自动刷新阻断列表
- 增加对advanced_acl模块的调用处理
- 提供用户友好的错误提示信息
mcbaiyun 2 ماه پیش
والد
کامیت
63e1c33f5a
2فایلهای تغییر یافته به همراه116 افزوده شده و 0 حذف شده
  1. 105 0
      gui.py
  2. 11 0
      protector.py

+ 105 - 0
gui.py

@@ -121,6 +121,39 @@ class ProtectorGUI(tk.Tk):
             self.pause_btn = tk.Button(self, text="Pause", command=self._toggle_pause)
             self.pause_btn.pack(anchor="ne", padx=8)
 
+        # Blocked IPs frame (uses advanced_acl to query current Test_ prefixed rules)
+        bf = tk.LabelFrame(self, text="Blocked IPs (来自高级 ACL)")
+        bf.pack(fill="both", expand=False, padx=8, pady=6)
+
+        self.block_listbox = tk.Listbox(bf, height=8)
+        self.block_listbox.pack(side="left", fill="both", expand=True, padx=4, pady=4)
+
+        block_right = tk.Frame(bf)
+        block_right.pack(side="right", fill="y", padx=4)
+
+        tk.Button(block_right, text="刷新阻断列表", command=self.refresh_blocked_ips).pack(fill="x", pady=2)
+        tk.Label(block_right, text="手动加入 IP:").pack(anchor="w", pady=(8,0))
+        self.manual_ip_entry = tk.Entry(block_right)
+        self.manual_ip_entry.pack(fill="x")
+        tk.Button(block_right, text="加入阻断", command=self.manual_add_ip).pack(fill="x", pady=2)
+        tk.Button(block_right, text="从列表删除选中", command=self.manual_remove_selected).pack(fill="x", pady=2)
+
+        # Register on_run_complete callback only after GUI widgets exist
+        if self.protector_runner is not None:
+            def _on_protector_run_complete():
+                # schedule a GUI-thread refresh; guard if widget removed
+                try:
+                    if hasattr(self, "block_listbox") and self.block_listbox is not None:
+                        self.after(100, self.refresh_blocked_ips)
+                except Exception:
+                    pass
+
+            try:
+                self.protector_runner.on_run_complete = _on_protector_run_complete
+            except Exception:
+                # ignore if unable to set
+                pass
+
         # start initial delayed run countdown (10 seconds) if runner provided
         if self.protector_runner is not None:
             self._initial_delay = 10
@@ -150,6 +183,78 @@ class ProtectorGUI(tk.Tk):
         items = protector_list.load_list()
         for i in items:
             self.listbox.insert(tk.END, f"{i['id']}: {i['target_ip']}:{i['src_port']} (th={i.get('threshold')})")
+        # do NOT auto-refresh blocked IPs here (only refresh on explicit action or protector run)
+
+    def refresh_blocked_ips(self):
+        """Query advanced_acl.get_all_test_ips() and update the block_listbox."""
+        # defensive: ensure listbox exists
+        if not hasattr(self, "block_listbox") or self.block_listbox is None:
+            # nothing to refresh
+            return
+
+        try:
+            import advanced_acl
+        except Exception as e:
+            messagebox.showerror("错误", f"无法导入 advanced_acl: {e}")
+            return
+
+        try:
+            # clear listbox
+            try:
+                self.block_listbox.delete(0, tk.END)
+            except Exception:
+                # if delete fails, just recreate a fresh listbox entry
+                pass
+
+            ips = advanced_acl.get_all_test_ips()
+            # ips expected as list of dicts or simple ip strings
+            if isinstance(ips, dict):
+                # maybe returned as {"ips": [...]}
+                ips_list = ips.get("ips") or ips.get("data") or []
+            else:
+                ips_list = ips or []
+
+            for it in ips_list:
+                if isinstance(it, dict):
+                    ip = it.get("ip") or it.get("dst_addr") or str(it)
+                    comment = it.get("comment") or it.get("rule") or ""
+                    self.block_listbox.insert(tk.END, f"{ip}    {comment}")
+                else:
+                    self.block_listbox.insert(tk.END, str(it))
+        except Exception as e:
+            # show more helpful error including type
+            messagebox.showerror("错误", f"获取阻断列表失败: {type(e).__name__}: {e}")
+
+    def manual_add_ip(self):
+        ip = self.manual_ip_entry.get().strip()
+        if not ip:
+            messagebox.showerror("错误", "请输入要阻断的 IP")
+            return
+        try:
+            import advanced_acl
+            res = advanced_acl.add_ip(ip)
+            # best-effort feedback
+            messagebox.showinfo("加入阻断", f"尝试加入 {ip}: {res}")
+            self.refresh_blocked_ips()
+        except Exception as e:
+            messagebox.showerror("错误", f"加入阻断失败: {e}")
+
+    def manual_remove_selected(self):
+        sel = self.block_listbox.curselection()
+        if not sel:
+            messagebox.showerror("错误", "先选择要删除的项")
+            return
+        idx = sel[0]
+        item = self.block_listbox.get(idx)
+        # try to parse ip from the line (assume leading token)
+        ip = item.split()[0]
+        try:
+            import advanced_acl
+            res = advanced_acl.del_ip(ip)
+            messagebox.showinfo("删除阻断", f"尝试删除 {ip}: {res}")
+            self.refresh_blocked_ips()
+        except Exception as e:
+            messagebox.showerror("错误", f"删除阻断失败: {e}")
 
     def save_config(self):
         try:

+ 11 - 0
protector.py

@@ -24,6 +24,8 @@ class ProtectorRunner:
         # pause control: _pause_event is set when running, cleared when paused
         self._pause_event = threading.Event()
         self._pause_event.set()
+        # optional callback invoked after each run_once() completes. Signature: fn()
+        self.on_run_complete = None
 
     def run_once(self):
         # if paused, skip immediate execution (protect against external triggers)
@@ -76,6 +78,15 @@ class ProtectorRunner:
                         logger.info(f"已尝试阻断 {dst}, 状态: {resp.status_code}, 返回: {data}")
                 except Exception as e:
                     logger.exception(f"阻断失败 {dst}: {e}")
+        # invoke completion callback (best-effort, do not allow callback exceptions to break flow)
+        try:
+            if callable(self.on_run_complete):
+                try:
+                    self.on_run_complete()
+                except Exception:
+                    logger.exception("on_run_complete callback failed")
+        except Exception:
+            pass
 
     def start(self):
         self._stop.clear()