"""A simple tkinter GUI to edit config.json and manage protector list. Features: - Edit base_url, conn_threshold, scan_interval (saved to config.json) - Manage protector_list entries (add/edit/remove) stored in protector_list.json - On saving base_url, attempt login via auth_session.login() and store sess_key """ from __future__ import annotations import json import threading import time import tkinter as tk from tkinter import messagebox from pathlib import Path from auth_session import load_config, get_base_url, login, set_sess_key import protector_list ROOT = Path(__file__).resolve().parent CONFIG_PATH = ROOT / "config.json" def load_cfg(): return load_config() def save_cfg(cfg: dict): with open(CONFIG_PATH, "w", encoding="utf-8") as f: json.dump(cfg, f, ensure_ascii=False, indent=2) class ProtectorGUI(tk.Tk): def __init__(self, protector_runner=None): super().__init__() self.title("Protector GUI") self.geometry("700x710") self.cfg = load_cfg() # 配置区域 cf = tk.LabelFrame(self, text="配置") cf.pack(fill="x", padx=8, pady=6) tk.Label(cf, text="设备地址 (base_url):").grid(row=0, column=0, sticky="w") self.base_entry = tk.Entry(cf, width=60) self.base_entry.grid(row=0, column=1, padx=4, pady=2) tk.Label(cf, text="连接阈值:").grid(row=1, column=0, sticky="w") self.th_entry = tk.Entry(cf, width=10) self.th_entry.grid(row=1, column=1, sticky="w", padx=4, pady=2) tk.Label(cf, text="测试前缀:").grid(row=1, column=2, sticky="w", padx=(12,0)) self.test_prefix_entry = tk.Entry(cf, width=15) self.test_prefix_entry.grid(row=1, column=3, sticky="w", padx=4, pady=2) tk.Label(cf, text="规则 IP 上限:").grid(row=2, column=2, sticky="w", padx=(12,0)) self.rule_limit_entry = tk.Entry(cf, width=15) self.rule_limit_entry.grid(row=2, column=3, sticky="w", padx=4, pady=2) tk.Label(cf, text="检测间隔 (秒):").grid(row=2, column=0, sticky="w") self.interval_entry = tk.Entry(cf, width=10) self.interval_entry.grid(row=2, column=1, sticky="w", padx=4, pady=2) tk.Button(cf, text="保存配置", command=self.save_config).grid(row=3, column=1, sticky="w", pady=6) # 监控目标列表 lf = tk.LabelFrame(self, text="监控目标列表") lf.pack(fill="both", expand=True, padx=8, pady=6) self.listbox = tk.Listbox(lf) self.listbox.pack(side="left", fill="both", expand=True, padx=4, pady=4) self.listbox.bind("<>", self.on_select) right = tk.Frame(lf) right.pack(side="right", fill="y", padx=4) tk.Label(right, text="目标 IP:").pack(anchor="w") self.ip_entry = tk.Entry(right) self.ip_entry.pack(fill="x") tk.Label(right, text="源端口:").pack(anchor="w") self.port_entry = tk.Entry(right) self.port_entry.pack(fill="x") tk.Label(right, text="阈值(可选):").pack(anchor="w") self.entry_threshold = tk.Entry(right) self.entry_threshold.pack(fill="x") tk.Button(right, text="添加", command=self.add_entry).pack(fill="x", pady=4) tk.Button(right, text="更新", command=self.update_entry).pack(fill="x", pady=4) tk.Button(right, text="删除", command=self.remove_entry).pack(fill="x", pady=4) self.load_values() # 自动在后台尝试登录(启动后立即) def _auto_login(): try: resp, sess_cookie = login() if sess_cookie: set_sess_key(sess_cookie) # 不弹出大量通知,只有在需要时可打开下面这一行 # messagebox.showinfo("登录", f"自动登录完成, 状态: {resp.status_code}") except Exception as e: # 登录失败不阻塞 GUI,记录到控制台/弹出一条错误提示 try: messagebox.showerror("自动登录失败", str(e)) except Exception: print("自动登录失败:", e) threading.Thread(target=_auto_login, daemon=True).start() # Protector runner instance (optional) for countdown and triggering runs self.protector_runner = protector_runner # 倒计时标签 self.countdown_var = tk.StringVar(value="保护器:空闲") self.countdown_label = tk.Label(self, textvariable=self.countdown_var) self.countdown_label.pack(anchor="ne", padx=8, pady=4) # Pause/Resume button self.pause_btn = None if self.protector_runner is not None: self.pause_btn = tk.Button(self, text="暂停", command=self._toggle_pause) self.pause_btn.pack(anchor="ne", padx=8) # Blocked IPs frame (uses advanced_acl to query current Test_ prefixed rules) bf = tk.LabelFrame(self, text="已阻断的 IP(来自高级 ACL)") bf.pack(fill="both", expand=False, padx=8, pady=6) self.block_listbox = tk.Listbox(bf, height=8) self.block_listbox.pack(side="left", fill="both", expand=True, padx=4, pady=4) block_right = tk.Frame(bf) block_right.pack(side="right", fill="y", padx=4) tk.Button(block_right, text="刷新阻断列表", command=self.refresh_blocked_ips).pack(fill="x", pady=2) tk.Label(block_right, text="手动加入 IP:").pack(anchor="w", pady=(8,0)) self.manual_ip_entry = tk.Entry(block_right) self.manual_ip_entry.pack(fill="x") tk.Button(block_right, text="加入阻断", command=self.manual_add_ip).pack(fill="x", pady=2) tk.Button(block_right, text="从阻断列表删除选中项", command=self.manual_remove_selected).pack(fill="x", pady=2) # Register on_run_complete callback only after GUI widgets exist if self.protector_runner is not None: def _on_protector_run_complete(): # schedule a GUI-thread refresh; guard if widget removed try: if hasattr(self, "block_listbox") and self.block_listbox is not None: self.after(100, self.refresh_blocked_ips) except Exception: pass try: self.protector_runner.on_run_complete = _on_protector_run_complete except Exception: # ignore if unable to set pass # start initial delayed run countdown (10 seconds) if runner provided if self.protector_runner is not None: self._initial_delay = 10 self._countdown_seconds = self._initial_delay # schedule countdown update every 1 second via tkinter's after self.after(1000, self._update_countdown) def load_values(self): self.cfg = load_cfg() self.base_entry.delete(0, tk.END) self.base_entry.insert(0, self.cfg.get("base_url", "")) self.th_entry.delete(0, tk.END) self.th_entry.insert(0, str(self.cfg.get("conn_threshold", ""))) # new fields self.test_prefix_entry.delete(0, tk.END) self.test_prefix_entry.insert(0, str(self.cfg.get("test_prefix", "Test_"))) self.rule_limit_entry.delete(0, tk.END) self.rule_limit_entry.insert(0, str(self.cfg.get("rule_ip_limit", 1000))) self.interval_entry.delete(0, tk.END) self.interval_entry.insert(0, str(self.cfg.get("scan_interval", 60))) self.reload_listbox() def reload_listbox(self): self.listbox.delete(0, tk.END) items = protector_list.load_list() for i in items: self.listbox.insert(tk.END, f"{i['id']}: {i['target_ip']}:{i['src_port']} (th={i.get('threshold')})") # do NOT auto-refresh blocked IPs here (only refresh on explicit action or protector run) def refresh_blocked_ips(self): """Query advanced_acl.get_all_test_ips() and update the block_listbox.""" # defensive: ensure listbox exists if not hasattr(self, "block_listbox") or self.block_listbox is None: # nothing to refresh return try: import advanced_acl except Exception as e: messagebox.showerror("错误", f"无法导入 advanced_acl: {e}") return try: # clear listbox try: self.block_listbox.delete(0, tk.END) except Exception: # if delete fails, just recreate a fresh listbox entry pass ips = advanced_acl.get_all_test_ips() # ips expected as list of dicts or simple ip strings if isinstance(ips, dict): # maybe returned as {"ips": [...]} ips_list = ips.get("ips") or ips.get("data") or [] else: ips_list = ips or [] for it in ips_list: if isinstance(it, dict): ip = it.get("ip") or it.get("dst_addr") or str(it) comment = it.get("comment") or it.get("rule") or "" self.block_listbox.insert(tk.END, f"{ip} {comment}") else: self.block_listbox.insert(tk.END, str(it)) except Exception as e: # show more helpful error including type messagebox.showerror("错误", f"获取阻断列表失败: {type(e).__name__}: {e}") def manual_add_ip(self): ip = self.manual_ip_entry.get().strip() if not ip: messagebox.showerror("错误", "请输入要阻断的 IP") return try: import advanced_acl res = advanced_acl.add_ip(ip) # best-effort feedback messagebox.showinfo("加入阻断", f"尝试加入 {ip}: {res}") self.refresh_blocked_ips() except Exception as e: messagebox.showerror("错误", f"加入阻断失败: {e}") def manual_remove_selected(self): sel = self.block_listbox.curselection() if not sel: messagebox.showerror("错误", "先选择要删除的项") return idx = sel[0] item = self.block_listbox.get(idx) # try to parse ip from the line (assume leading token) ip = item.split()[0] try: import advanced_acl res = advanced_acl.del_ip(ip) messagebox.showinfo("删除阻断", f"尝试删除 {ip}: {res}") self.refresh_blocked_ips() except Exception as e: messagebox.showerror("错误", f"删除阻断失败: {e}") def save_config(self): try: self.cfg["base_url"] = self.base_entry.get().strip() self.cfg["conn_threshold"] = int(self.th_entry.get().strip()) # optional new fields self.cfg["test_prefix"] = str(self.test_prefix_entry.get().strip()) self.cfg["rule_ip_limit"] = int(self.rule_limit_entry.get().strip()) self.cfg["scan_interval"] = int(self.interval_entry.get().strip()) except Exception as e: messagebox.showerror("错误", f"配置输入无效: {e}") return save_cfg(self.cfg) # try login on a background thread def _login(): try: resp, sess_cookie = login() if sess_cookie: set_sess_key(sess_cookie) messagebox.showinfo("登录", f"登录完成, 状态: {resp.status_code}") except Exception as e: messagebox.showerror("登录失败", str(e)) threading.Thread(target=_login, daemon=True).start() def on_select(self, evt): sel = self.listbox.curselection() if not sel: return idx = sel[0] items = protector_list.load_list() if idx >= len(items): return item = items[idx] self.ip_entry.delete(0, tk.END) self.ip_entry.insert(0, item.get("target_ip", "")) self.port_entry.delete(0, tk.END) self.port_entry.insert(0, str(item.get("src_port", ""))) self.entry_threshold.delete(0, tk.END) self.entry_threshold.insert(0, str(item.get("threshold", ""))) def add_entry(self): ip = self.ip_entry.get().strip() port = self.port_entry.get().strip() th = self.entry_threshold.get().strip() or None if not ip or not port: messagebox.showerror("错误", "ip 与 port 为必填") return try: entry = protector_list.add_entry(ip, int(port), int(th) if th else None) self.reload_listbox() messagebox.showinfo("添加", f"已添加: {entry}") except Exception as e: messagebox.showerror("错误", str(e)) def update_entry(self): sel = self.listbox.curselection() if not sel: messagebox.showerror("错误", "先选择一条") return idx = sel[0] items = protector_list.load_list() if idx >= len(items): return eid = items[idx]["id"] ip = self.ip_entry.get().strip() port = self.port_entry.get().strip() th = self.entry_threshold.get().strip() or None try: updated = protector_list.update_entry(eid, target_ip=ip, src_port=int(port), threshold=(int(th) if th else None)) if updated is None: messagebox.showerror("错误", "更新失败") else: self.reload_listbox() messagebox.showinfo("更新", f"已更新: {updated}") except Exception as e: messagebox.showerror("错误", str(e)) def remove_entry(self): sel = self.listbox.curselection() if not sel: messagebox.showerror("错误", "先选择一条") return idx = sel[0] items = protector_list.load_list() if idx >= len(items): return eid = items[idx]["id"] ok = protector_list.remove_entry(eid) if ok: self.reload_listbox() messagebox.showinfo("删除", "已删除") else: messagebox.showerror("错误", "删除失败") def _update_countdown(self): """Update countdown label every second and trigger protector run when it reaches zero.""" try: if not self.protector_runner: return # Update label: compute seconds until next run based on protector_runner's schedule try: if self.protector_runner.is_paused(): self.countdown_var.set("保护器:已暂停") else: next_ts = self.protector_runner.get_next_run_time() if next_ts is None: # fallback to internal counter secs = max(0, int(self._countdown_seconds)) else: secs = max(0, int(round(next_ts - time.time()))) self.countdown_var.set(f"下一次检测:{secs}s") except Exception: self.countdown_var.set("保护器:运行中") # if protector hasn't been started yet, use internal countdown try: started = getattr(self.protector_runner, "_started", False) except Exception: started = False if not started: # use internal countdown to start if self._countdown_seconds <= 0: do_start = True else: do_start = False else: # protector already started; rely on its internal schedule do_start = False if do_start: # If protector runner not started yet, start its loop first try: if not getattr(self.protector_runner, "_started", False): # start the background loop self.protector_runner._started = True threading.Thread(target=self.protector_runner.start, daemon=True).start() except Exception: pass # ProtectorRunner.start() will perform an initial run, so do not # explicitly call run_once() here to avoid double execution. # reset internal countdown to interval after start try: self._countdown_seconds = int(self.protector_runner.get_interval()) except Exception: self._countdown_seconds = self._initial_delay # update pause button text try: if self.pause_btn: self.pause_btn.config(text=("恢复" if self.protector_runner.is_paused() else "暂停")) except Exception: pass else: # when protector not started, decrement the internal counter for display if not started: self._countdown_seconds -= 1 except Exception as e: # keep ticking even on error print("倒计时更新出错:", e) # schedule next tick try: self.after(1000, self._update_countdown) except Exception: pass def _toggle_pause(self): if not self.protector_runner: return try: if self.protector_runner.is_paused(): self.protector_runner.resume() if self.pause_btn: self.pause_btn.config(text="暂停") else: self.protector_runner.pause() if self.pause_btn: self.pause_btn.config(text="恢复") except Exception as e: messagebox.showerror("错误", f"切换暂停/恢复失败: {e}") def run_gui(protector_runner=None): app = ProtectorGUI(protector_runner=protector_runner) app.mainloop() if __name__ == "__main__": run_gui()