| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458 |
- """A simple tkinter GUI to edit config.json and manage protector list.
- Features:
- - Edit base_url, conn_threshold, scan_interval (saved to config.json)
- - Manage protector_list entries (add/edit/remove) stored in protector_list.json
- - On saving base_url, attempt login via auth_session.login() and store sess_key
- """
- from __future__ import annotations
- import json
- import threading
- import time
- import tkinter as tk
- from tkinter import messagebox
- from pathlib import Path
- from auth_session import load_config, get_base_url, login, set_sess_key
- import protector_list
- ROOT = Path(__file__).resolve().parent
- CONFIG_PATH = ROOT / "config.json"
- def load_cfg():
- return load_config()
- def save_cfg(cfg: dict):
- with open(CONFIG_PATH, "w", encoding="utf-8") as f:
- json.dump(cfg, f, ensure_ascii=False, indent=2)
- class ProtectorGUI(tk.Tk):
- def __init__(self, protector_runner=None):
- super().__init__()
- self.title("Protector GUI")
- self.geometry("700x500")
- self.cfg = load_cfg()
- # Config frame
- cf = tk.LabelFrame(self, text="Config")
- cf.pack(fill="x", padx=8, pady=6)
- tk.Label(cf, text="base_url:").grid(row=0, column=0, sticky="w")
- self.base_entry = tk.Entry(cf, width=60)
- self.base_entry.grid(row=0, column=1, padx=4, pady=2)
- tk.Label(cf, text="conn_threshold:").grid(row=1, column=0, sticky="w")
- self.th_entry = tk.Entry(cf, width=10)
- self.th_entry.grid(row=1, column=1, sticky="w", padx=4, pady=2)
- tk.Label(cf, text="test_prefix:").grid(row=1, column=2, sticky="w", padx=(12,0))
- self.test_prefix_entry = tk.Entry(cf, width=15)
- self.test_prefix_entry.grid(row=1, column=3, sticky="w", padx=4, pady=2)
- tk.Label(cf, text="rule_ip_limit:").grid(row=2, column=2, sticky="w", padx=(12,0))
- self.rule_limit_entry = tk.Entry(cf, width=15)
- self.rule_limit_entry.grid(row=2, column=3, sticky="w", padx=4, pady=2)
- tk.Label(cf, text="scan_interval (s):").grid(row=2, column=0, sticky="w")
- self.interval_entry = tk.Entry(cf, width=10)
- self.interval_entry.grid(row=2, column=1, sticky="w", padx=4, pady=2)
- tk.Button(cf, text="Save Config", command=self.save_config).grid(row=3, column=1, sticky="w", pady=6)
- # Protector list frame
- lf = tk.LabelFrame(self, text="Protector List")
- lf.pack(fill="both", expand=True, padx=8, pady=6)
- self.listbox = tk.Listbox(lf)
- self.listbox.pack(side="left", fill="both", expand=True, padx=4, pady=4)
- self.listbox.bind("<<ListboxSelect>>", self.on_select)
- right = tk.Frame(lf)
- right.pack(side="right", fill="y", padx=4)
- tk.Label(right, text="target_ip:").pack(anchor="w")
- self.ip_entry = tk.Entry(right)
- self.ip_entry.pack(fill="x")
- tk.Label(right, text="src_port:").pack(anchor="w")
- self.port_entry = tk.Entry(right)
- self.port_entry.pack(fill="x")
- tk.Label(right, text="threshold (optional):").pack(anchor="w")
- self.entry_threshold = tk.Entry(right)
- self.entry_threshold.pack(fill="x")
- tk.Button(right, text="Add", command=self.add_entry).pack(fill="x", pady=4)
- tk.Button(right, text="Update", command=self.update_entry).pack(fill="x", pady=4)
- tk.Button(right, text="Remove", command=self.remove_entry).pack(fill="x", pady=4)
- self.load_values()
- # 自动在后台尝试登录(启动后立即)
- def _auto_login():
- try:
- resp, sess_cookie = login()
- if sess_cookie:
- set_sess_key(sess_cookie)
- # 不弹出大量通知,只有在需要时可打开下面这一行
- # messagebox.showinfo("登录", f"自动登录完成, 状态: {resp.status_code}")
- except Exception as e:
- # 登录失败不阻塞 GUI,记录到控制台/弹出一条错误提示
- try:
- messagebox.showerror("自动登录失败", str(e))
- except Exception:
- print("自动登录失败:", e)
- threading.Thread(target=_auto_login, daemon=True).start()
- # Protector runner instance (optional) for countdown and triggering runs
- self.protector_runner = protector_runner
- # countdown label
- self.countdown_var = tk.StringVar(value="Protector: idle")
- self.countdown_label = tk.Label(self, textvariable=self.countdown_var)
- self.countdown_label.pack(anchor="ne", padx=8, pady=4)
- # Pause/Resume button
- self.pause_btn = None
- if self.protector_runner is not None:
- self.pause_btn = tk.Button(self, text="Pause", command=self._toggle_pause)
- self.pause_btn.pack(anchor="ne", padx=8)
- # Blocked IPs frame (uses advanced_acl to query current Test_ prefixed rules)
- bf = tk.LabelFrame(self, text="Blocked IPs (来自高级 ACL)")
- bf.pack(fill="both", expand=False, padx=8, pady=6)
- self.block_listbox = tk.Listbox(bf, height=8)
- self.block_listbox.pack(side="left", fill="both", expand=True, padx=4, pady=4)
- block_right = tk.Frame(bf)
- block_right.pack(side="right", fill="y", padx=4)
- tk.Button(block_right, text="刷新阻断列表", command=self.refresh_blocked_ips).pack(fill="x", pady=2)
- tk.Label(block_right, text="手动加入 IP:").pack(anchor="w", pady=(8,0))
- self.manual_ip_entry = tk.Entry(block_right)
- self.manual_ip_entry.pack(fill="x")
- tk.Button(block_right, text="加入阻断", command=self.manual_add_ip).pack(fill="x", pady=2)
- tk.Button(block_right, text="从列表删除选中", command=self.manual_remove_selected).pack(fill="x", pady=2)
- # Register on_run_complete callback only after GUI widgets exist
- if self.protector_runner is not None:
- def _on_protector_run_complete():
- # schedule a GUI-thread refresh; guard if widget removed
- try:
- if hasattr(self, "block_listbox") and self.block_listbox is not None:
- self.after(100, self.refresh_blocked_ips)
- except Exception:
- pass
- try:
- self.protector_runner.on_run_complete = _on_protector_run_complete
- except Exception:
- # ignore if unable to set
- pass
- # start initial delayed run countdown (10 seconds) if runner provided
- if self.protector_runner is not None:
- self._initial_delay = 10
- self._countdown_seconds = self._initial_delay
- # schedule countdown update every 1 second via tkinter's after
- self.after(1000, self._update_countdown)
- def load_values(self):
- self.cfg = load_cfg()
- self.base_entry.delete(0, tk.END)
- self.base_entry.insert(0, self.cfg.get("base_url", ""))
- self.th_entry.delete(0, tk.END)
- self.th_entry.insert(0, str(self.cfg.get("conn_threshold", "")))
- # new fields
- self.test_prefix_entry.delete(0, tk.END)
- self.test_prefix_entry.insert(0, str(self.cfg.get("test_prefix", "Test_")))
- self.rule_limit_entry.delete(0, tk.END)
- self.rule_limit_entry.insert(0, str(self.cfg.get("rule_ip_limit", 1000)))
- self.interval_entry.delete(0, tk.END)
- self.interval_entry.insert(0, str(self.cfg.get("scan_interval", 60)))
- self.reload_listbox()
- def reload_listbox(self):
- self.listbox.delete(0, tk.END)
- items = protector_list.load_list()
- for i in items:
- self.listbox.insert(tk.END, f"{i['id']}: {i['target_ip']}:{i['src_port']} (th={i.get('threshold')})")
- # do NOT auto-refresh blocked IPs here (only refresh on explicit action or protector run)
- def refresh_blocked_ips(self):
- """Query advanced_acl.get_all_test_ips() and update the block_listbox."""
- # defensive: ensure listbox exists
- if not hasattr(self, "block_listbox") or self.block_listbox is None:
- # nothing to refresh
- return
- try:
- import advanced_acl
- except Exception as e:
- messagebox.showerror("错误", f"无法导入 advanced_acl: {e}")
- return
- try:
- # clear listbox
- try:
- self.block_listbox.delete(0, tk.END)
- except Exception:
- # if delete fails, just recreate a fresh listbox entry
- pass
- ips = advanced_acl.get_all_test_ips()
- # ips expected as list of dicts or simple ip strings
- if isinstance(ips, dict):
- # maybe returned as {"ips": [...]}
- ips_list = ips.get("ips") or ips.get("data") or []
- else:
- ips_list = ips or []
- for it in ips_list:
- if isinstance(it, dict):
- ip = it.get("ip") or it.get("dst_addr") or str(it)
- comment = it.get("comment") or it.get("rule") or ""
- self.block_listbox.insert(tk.END, f"{ip} {comment}")
- else:
- self.block_listbox.insert(tk.END, str(it))
- except Exception as e:
- # show more helpful error including type
- messagebox.showerror("错误", f"获取阻断列表失败: {type(e).__name__}: {e}")
- def manual_add_ip(self):
- ip = self.manual_ip_entry.get().strip()
- if not ip:
- messagebox.showerror("错误", "请输入要阻断的 IP")
- return
- try:
- import advanced_acl
- res = advanced_acl.add_ip(ip)
- # best-effort feedback
- messagebox.showinfo("加入阻断", f"尝试加入 {ip}: {res}")
- self.refresh_blocked_ips()
- except Exception as e:
- messagebox.showerror("错误", f"加入阻断失败: {e}")
- def manual_remove_selected(self):
- sel = self.block_listbox.curselection()
- if not sel:
- messagebox.showerror("错误", "先选择要删除的项")
- return
- idx = sel[0]
- item = self.block_listbox.get(idx)
- # try to parse ip from the line (assume leading token)
- ip = item.split()[0]
- try:
- import advanced_acl
- res = advanced_acl.del_ip(ip)
- messagebox.showinfo("删除阻断", f"尝试删除 {ip}: {res}")
- self.refresh_blocked_ips()
- except Exception as e:
- messagebox.showerror("错误", f"删除阻断失败: {e}")
- def save_config(self):
- try:
- self.cfg["base_url"] = self.base_entry.get().strip()
- self.cfg["conn_threshold"] = int(self.th_entry.get().strip())
- # optional new fields
- self.cfg["test_prefix"] = str(self.test_prefix_entry.get().strip())
- self.cfg["rule_ip_limit"] = int(self.rule_limit_entry.get().strip())
- self.cfg["scan_interval"] = int(self.interval_entry.get().strip())
- except Exception as e:
- messagebox.showerror("错误", f"配置输入无效: {e}")
- return
- save_cfg(self.cfg)
- # try login on a background thread
- def _login():
- try:
- resp, sess_cookie = login()
- if sess_cookie:
- set_sess_key(sess_cookie)
- messagebox.showinfo("登录", f"登录完成, 状态: {resp.status_code}")
- except Exception as e:
- messagebox.showerror("登录失败", str(e))
- threading.Thread(target=_login, daemon=True).start()
- def on_select(self, evt):
- sel = self.listbox.curselection()
- if not sel:
- return
- idx = sel[0]
- items = protector_list.load_list()
- if idx >= len(items):
- return
- item = items[idx]
- self.ip_entry.delete(0, tk.END)
- self.ip_entry.insert(0, item.get("target_ip", ""))
- self.port_entry.delete(0, tk.END)
- self.port_entry.insert(0, str(item.get("src_port", "")))
- self.entry_threshold.delete(0, tk.END)
- self.entry_threshold.insert(0, str(item.get("threshold", "")))
- def add_entry(self):
- ip = self.ip_entry.get().strip()
- port = self.port_entry.get().strip()
- th = self.entry_threshold.get().strip() or None
- if not ip or not port:
- messagebox.showerror("错误", "ip 与 port 为必填")
- return
- try:
- entry = protector_list.add_entry(ip, int(port), int(th) if th else None)
- self.reload_listbox()
- messagebox.showinfo("添加", f"已添加: {entry}")
- except Exception as e:
- messagebox.showerror("错误", str(e))
- def update_entry(self):
- sel = self.listbox.curselection()
- if not sel:
- messagebox.showerror("错误", "先选择一条")
- return
- idx = sel[0]
- items = protector_list.load_list()
- if idx >= len(items):
- return
- eid = items[idx]["id"]
- ip = self.ip_entry.get().strip()
- port = self.port_entry.get().strip()
- th = self.entry_threshold.get().strip() or None
- try:
- updated = protector_list.update_entry(eid, target_ip=ip, src_port=int(port), threshold=(int(th) if th else None))
- if updated is None:
- messagebox.showerror("错误", "更新失败")
- else:
- self.reload_listbox()
- messagebox.showinfo("更新", f"已更新: {updated}")
- except Exception as e:
- messagebox.showerror("错误", str(e))
- def remove_entry(self):
- sel = self.listbox.curselection()
- if not sel:
- messagebox.showerror("错误", "先选择一条")
- return
- idx = sel[0]
- items = protector_list.load_list()
- if idx >= len(items):
- return
- eid = items[idx]["id"]
- ok = protector_list.remove_entry(eid)
- if ok:
- self.reload_listbox()
- messagebox.showinfo("删除", "已删除")
- else:
- messagebox.showerror("错误", "删除失败")
- def _update_countdown(self):
- """Update countdown label every second and trigger protector run when it reaches zero."""
- try:
- if not self.protector_runner:
- return
- # Update label
- # compute seconds until next run based on protector_runner's schedule
- try:
- if self.protector_runner.is_paused():
- self.countdown_var.set("Protector: paused")
- else:
- next_ts = self.protector_runner.get_next_run_time()
- if next_ts is None:
- # fallback to internal counter
- secs = max(0, int(self._countdown_seconds))
- else:
- secs = max(0, int(round(next_ts - time.time())))
- self.countdown_var.set(f"下一次检测:{secs}s")
- except Exception:
- self.countdown_var.set("Protector: running")
- # if protector hasn't been started yet, use internal countdown
- try:
- started = getattr(self.protector_runner, "_started", False)
- except Exception:
- started = False
- if not started:
- # use internal countdown to start
- if self._countdown_seconds <= 0:
- do_start = True
- else:
- do_start = False
- else:
- # protector already started; rely on its internal schedule
- do_start = False
- if do_start:
- # If protector runner not started yet, start its loop first
- try:
- if not getattr(self.protector_runner, "_started", False):
- # start the background loop
- self.protector_runner._started = True
- threading.Thread(target=self.protector_runner.start, daemon=True).start()
- except Exception:
- pass
- # ProtectorRunner.start() will perform an initial run, so do not
- # explicitly call run_once() here to avoid double execution.
- # reset internal countdown to interval after start
- try:
- self._countdown_seconds = int(self.protector_runner.get_interval())
- except Exception:
- self._countdown_seconds = self._initial_delay
- # reset countdown to configured interval
- try:
- self._countdown_seconds = int(self.protector_runner.get_interval())
- except Exception:
- self._countdown_seconds = self._initial_delay
- # update pause button text
- try:
- if self.pause_btn:
- self.pause_btn.config(text=("Resume" if self.protector_runner.is_paused() else "Pause"))
- except Exception:
- pass
- else:
- # when protector already started, we won't decrement internal counter
- # keep internal counter but keep ticking for display purposes
- if not started:
- self._countdown_seconds -= 1
- except Exception as e:
- # keep ticking even on error
- print("倒计时更新出错:", e)
- # schedule next tick
- try:
- self.after(1000, self._update_countdown)
- except Exception:
- pass
- def _toggle_pause(self):
- if not self.protector_runner:
- return
- try:
- if self.protector_runner.is_paused():
- self.protector_runner.resume()
- if self.pause_btn:
- self.pause_btn.config(text="Pause")
- else:
- self.protector_runner.pause()
- if self.pause_btn:
- self.pause_btn.config(text="Resume")
- except Exception as e:
- messagebox.showerror("错误", f"切换暂停/恢复失败: {e}")
- def run_gui(protector_runner=None):
- app = ProtectorGUI(protector_runner=protector_runner)
- app.mainloop()
- if __name__ == "__main__":
- run_gui()
|