| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- """A simple tkinter GUI to edit config.json and manage protector list.
- Features:
- - Edit base_url, conn_threshold, scan_interval (saved to config.json)
- - Manage protector_list entries (add/edit/remove) stored in protector_list.json
- - On saving base_url, attempt login via auth_session.login() and store sess_key
- """
- from __future__ import annotations
- import json
- import threading
- import tkinter as tk
- from tkinter import messagebox
- from pathlib import Path
- from auth_session import load_config, get_base_url, login, set_sess_key
- import protector_list
- ROOT = Path(__file__).resolve().parent
- CONFIG_PATH = ROOT / "config.json"
- def load_cfg():
- return load_config()
- def save_cfg(cfg: dict):
- with open(CONFIG_PATH, "w", encoding="utf-8") as f:
- json.dump(cfg, f, ensure_ascii=False, indent=2)
- class ProtectorGUI(tk.Tk):
- def __init__(self, protector_runner=None):
- super().__init__()
- self.title("Protector GUI")
- self.geometry("700x500")
- self.cfg = load_cfg()
- # Config frame
- cf = tk.LabelFrame(self, text="Config")
- cf.pack(fill="x", padx=8, pady=6)
- tk.Label(cf, text="base_url:").grid(row=0, column=0, sticky="w")
- self.base_entry = tk.Entry(cf, width=60)
- self.base_entry.grid(row=0, column=1, padx=4, pady=2)
- tk.Label(cf, text="conn_threshold:").grid(row=1, column=0, sticky="w")
- self.th_entry = tk.Entry(cf, width=10)
- self.th_entry.grid(row=1, column=1, sticky="w", padx=4, pady=2)
- tk.Label(cf, text="test_prefix:").grid(row=1, column=2, sticky="w", padx=(12,0))
- self.test_prefix_entry = tk.Entry(cf, width=15)
- self.test_prefix_entry.grid(row=1, column=3, sticky="w", padx=4, pady=2)
- tk.Label(cf, text="rule_ip_limit:").grid(row=2, column=2, sticky="w", padx=(12,0))
- self.rule_limit_entry = tk.Entry(cf, width=15)
- self.rule_limit_entry.grid(row=2, column=3, sticky="w", padx=4, pady=2)
- tk.Label(cf, text="scan_interval (s):").grid(row=2, column=0, sticky="w")
- self.interval_entry = tk.Entry(cf, width=10)
- self.interval_entry.grid(row=2, column=1, sticky="w", padx=4, pady=2)
- tk.Button(cf, text="Save Config", command=self.save_config).grid(row=3, column=1, sticky="w", pady=6)
- # Protector list frame
- lf = tk.LabelFrame(self, text="Protector List")
- lf.pack(fill="both", expand=True, padx=8, pady=6)
- self.listbox = tk.Listbox(lf)
- self.listbox.pack(side="left", fill="both", expand=True, padx=4, pady=4)
- self.listbox.bind("<<ListboxSelect>>", self.on_select)
- right = tk.Frame(lf)
- right.pack(side="right", fill="y", padx=4)
- tk.Label(right, text="target_ip:").pack(anchor="w")
- self.ip_entry = tk.Entry(right)
- self.ip_entry.pack(fill="x")
- tk.Label(right, text="src_port:").pack(anchor="w")
- self.port_entry = tk.Entry(right)
- self.port_entry.pack(fill="x")
- tk.Label(right, text="threshold (optional):").pack(anchor="w")
- self.entry_threshold = tk.Entry(right)
- self.entry_threshold.pack(fill="x")
- tk.Button(right, text="Add", command=self.add_entry).pack(fill="x", pady=4)
- tk.Button(right, text="Update", command=self.update_entry).pack(fill="x", pady=4)
- tk.Button(right, text="Remove", command=self.remove_entry).pack(fill="x", pady=4)
- self.load_values()
- # 自动在后台尝试登录(启动后立即)
- def _auto_login():
- try:
- resp, sess_cookie = login()
- if sess_cookie:
- set_sess_key(sess_cookie)
- # 不弹出大量通知,只有在需要时可打开下面这一行
- # messagebox.showinfo("登录", f"自动登录完成, 状态: {resp.status_code}")
- except Exception as e:
- # 登录失败不阻塞 GUI,记录到控制台/弹出一条错误提示
- try:
- messagebox.showerror("自动登录失败", str(e))
- except Exception:
- print("自动登录失败:", e)
- threading.Thread(target=_auto_login, daemon=True).start()
- # Protector runner instance (optional) for countdown and triggering runs
- self.protector_runner = protector_runner
- # countdown label
- self.countdown_var = tk.StringVar(value="Protector: idle")
- self.countdown_label = tk.Label(self, textvariable=self.countdown_var)
- self.countdown_label.pack(anchor="ne", padx=8, pady=4)
- # start initial delayed run countdown (10 seconds) if runner provided
- if self.protector_runner is not None:
- self._initial_delay = 10
- self._countdown_seconds = self._initial_delay
- # schedule countdown update every 1 second via tkinter's after
- self.after(1000, self._update_countdown)
- def load_values(self):
- self.cfg = load_cfg()
- self.base_entry.delete(0, tk.END)
- self.base_entry.insert(0, self.cfg.get("base_url", ""))
- self.th_entry.delete(0, tk.END)
- self.th_entry.insert(0, str(self.cfg.get("conn_threshold", "")))
- # new fields
- self.test_prefix_entry.delete(0, tk.END)
- self.test_prefix_entry.insert(0, str(self.cfg.get("test_prefix", "Test_")))
- self.rule_limit_entry.delete(0, tk.END)
- self.rule_limit_entry.insert(0, str(self.cfg.get("rule_ip_limit", 1000)))
- self.interval_entry.delete(0, tk.END)
- self.interval_entry.insert(0, str(self.cfg.get("scan_interval", 60)))
- self.reload_listbox()
- def reload_listbox(self):
- self.listbox.delete(0, tk.END)
- items = protector_list.load_list()
- for i in items:
- self.listbox.insert(tk.END, f"{i['id']}: {i['target_ip']}:{i['src_port']} (th={i.get('threshold')})")
- def save_config(self):
- try:
- self.cfg["base_url"] = self.base_entry.get().strip()
- self.cfg["conn_threshold"] = int(self.th_entry.get().strip())
- # optional new fields
- self.cfg["test_prefix"] = str(self.test_prefix_entry.get().strip())
- self.cfg["rule_ip_limit"] = int(self.rule_limit_entry.get().strip())
- self.cfg["scan_interval"] = int(self.interval_entry.get().strip())
- except Exception as e:
- messagebox.showerror("错误", f"配置输入无效: {e}")
- return
- save_cfg(self.cfg)
- # try login on a background thread
- def _login():
- try:
- resp, sess_cookie = login()
- if sess_cookie:
- set_sess_key(sess_cookie)
- messagebox.showinfo("登录", f"登录完成, 状态: {resp.status_code}")
- except Exception as e:
- messagebox.showerror("登录失败", str(e))
- threading.Thread(target=_login, daemon=True).start()
- def on_select(self, evt):
- sel = self.listbox.curselection()
- if not sel:
- return
- idx = sel[0]
- items = protector_list.load_list()
- if idx >= len(items):
- return
- item = items[idx]
- self.ip_entry.delete(0, tk.END)
- self.ip_entry.insert(0, item.get("target_ip", ""))
- self.port_entry.delete(0, tk.END)
- self.port_entry.insert(0, str(item.get("src_port", "")))
- self.entry_threshold.delete(0, tk.END)
- self.entry_threshold.insert(0, str(item.get("threshold", "")))
- def add_entry(self):
- ip = self.ip_entry.get().strip()
- port = self.port_entry.get().strip()
- th = self.entry_threshold.get().strip() or None
- if not ip or not port:
- messagebox.showerror("错误", "ip 与 port 为必填")
- return
- try:
- entry = protector_list.add_entry(ip, int(port), int(th) if th else None)
- self.reload_listbox()
- messagebox.showinfo("添加", f"已添加: {entry}")
- except Exception as e:
- messagebox.showerror("错误", str(e))
- def update_entry(self):
- sel = self.listbox.curselection()
- if not sel:
- messagebox.showerror("错误", "先选择一条")
- return
- idx = sel[0]
- items = protector_list.load_list()
- if idx >= len(items):
- return
- eid = items[idx]["id"]
- ip = self.ip_entry.get().strip()
- port = self.port_entry.get().strip()
- th = self.entry_threshold.get().strip() or None
- try:
- updated = protector_list.update_entry(eid, target_ip=ip, src_port=int(port), threshold=(int(th) if th else None))
- if updated is None:
- messagebox.showerror("错误", "更新失败")
- else:
- self.reload_listbox()
- messagebox.showinfo("更新", f"已更新: {updated}")
- except Exception as e:
- messagebox.showerror("错误", str(e))
- def remove_entry(self):
- sel = self.listbox.curselection()
- if not sel:
- messagebox.showerror("错误", "先选择一条")
- return
- idx = sel[0]
- items = protector_list.load_list()
- if idx >= len(items):
- return
- eid = items[idx]["id"]
- ok = protector_list.remove_entry(eid)
- if ok:
- self.reload_listbox()
- messagebox.showinfo("删除", "已删除")
- else:
- messagebox.showerror("错误", "删除失败")
- def _update_countdown(self):
- """Update countdown label every second and trigger protector run when it reaches zero."""
- try:
- if not self.protector_runner:
- return
- # Update label
- try:
- self.countdown_var.set(f"下一次检测:{self._countdown_seconds}s")
- except Exception:
- self.countdown_var.set("Protector: running")
- if self._countdown_seconds <= 0:
- # trigger a run in background
- try:
- threading.Thread(target=self.protector_runner.run_once, daemon=True).start()
- except Exception as e:
- print("触发 protector 运行失败:", e)
- # reset countdown to configured interval
- try:
- self._countdown_seconds = int(self.protector_runner.get_interval())
- except Exception:
- self._countdown_seconds = self._initial_delay
- else:
- self._countdown_seconds -= 1
- except Exception as e:
- # keep ticking even on error
- print("倒计时更新出错:", e)
- # schedule next tick
- try:
- self.after(1000, self._update_countdown)
- except Exception:
- pass
- def run_gui(protector_runner=None):
- app = ProtectorGUI(protector_runner=protector_runner)
- app.mainloop()
- if __name__ == "__main__":
- run_gui()
|