import json import sys from pathlib import Path from typing import Optional import requests from log_util import get_logger, log_request from session_state import get_sess_key ROOT = Path(__file__).resolve().parent CONFIG_PATH = ROOT / "config.json" def load_config(): if not CONFIG_PATH.exists(): raise FileNotFoundError(f"配置文件未找到: {CONFIG_PATH}") with open(CONFIG_PATH, "r", encoding="utf-8") as f: return json.load(f) def add_acl_rule( dst_addr: str, comment: str, protocol: str = "any", action: str = "drop", dir: str = "forward", ctdir: str = "0", iinterface: str = "any", ointerface: str = "any", src_addr: str = "", src_port: str = "", dst_port: str = "", enabled: str = "yes", week: str = "1234567", time: str = "00:00-23:59", ip_type: str = "4", src6_addr: str = "", dst6_addr: str = "", src6_mode: str = "", dst6_mode: str = "", src6_suffix: str = "", dst6_suffix: str = "", timeout: int = 10 ): """Send ACL add POST. Returns requests.Response and parsed JSON data. dst_addr: Comma-separated destination addresses, e.g., "1.2.3.4,2.4.5.6" comment: Comment for the rule, e.g., "remark_kkkkkk" Other parameters have defaults based on the example. """ cfg = load_config() base = cfg.get("base_url", "").rstrip("/") url = f"{base}/Action/call" data = { "func_name": "acl", "action": "add", "param": { "protocol": protocol, "action": action, "dir": dir, "ctdir": ctdir, "iinterface": iinterface, "ointerface": ointerface, "src_addr": src_addr, "dst_addr": dst_addr, "src_port": src_port, "dst_port": dst_port, "comment": comment, "enabled": enabled, "week": week, "time": time, "ip_type": ip_type, "src6_addr": src6_addr, "dst6_addr": dst6_addr, "src6_mode": src6_mode, "dst6_mode": dst6_mode, "src6_suffix": src6_suffix, "dst6_suffix": dst6_suffix } } logger = get_logger("add_acl_post") # Get sess_key from global state sess_key = get_sess_key() if not sess_key: raise ValueError("未找到 sess_key,请先登录") cookies = {"sess_key": sess_key.split("=")[1].rstrip(";")} # Extract value from "sess_key=value;" logger.debug(f"准备发送请求,URL: {url}") resp = requests.post(url, json=data, cookies=cookies, timeout=timeout) # 记录请求/响应 try: log_request(logger, "add_acl_rule", url, data, resp) except Exception: logger.exception("记录请求/响应失败") # Parse JSON response try: json_data = resp.json() except ValueError: json_data = None return resp, json_data def main(): logger = get_logger("main") try: # Example: Add ACL rule to drop traffic to 1.2.3.4 and 2.4.5.6 with comment "remark_kkkkkk" resp, data = add_acl_rule(dst_addr="1.2.3.4,2.4.5.6", comment="remark_kkkkkk") except FileNotFoundError as e: logger.error(f"配置错误: {e}") sys.exit(2) except ValueError as e: logger.error(f"会话错误: {e}") sys.exit(3) except requests.RequestException as e: logger.error(f"请求失败: {e}") sys.exit(1) # 控制台友好输出 logger.info(f"状态: {resp.status_code}") if data: try: pretty = json.dumps(data, ensure_ascii=False, indent=2) logger.info(f"响应 JSON:\n{pretty}") print(pretty) except Exception: logger.info(f"响应文本: {resp.text}") print(resp.text) else: logger.info(f"响应文本: {resp.text}") print(resp.text) if __name__ == "__main__": main()