import json import sys from pathlib import Path from typing import Optional import requests from log_util import get_logger, log_request from session_state import get_sess_key ROOT = Path(__file__).resolve().parent CONFIG_PATH = ROOT / "config.json" def load_config(): if not CONFIG_PATH.exists(): raise FileNotFoundError(f"配置文件未找到: {CONFIG_PATH}") with open(CONFIG_PATH, "r", encoding="utf-8") as f: return json.load(f) def edit_acl_rule( rule_id: int, dst_addr: str = "", comment: str = "", protocol: str = "any", action: str = "drop", dir: str = "forward", ctdir: int = 0, iinterface: str = "any", ointerface: str = "any", src_addr: str = "", src_port: str = "", dst_port: str = "", enabled: str = "yes", week: str = "1234567", time: str = "00:00-23:59", ip_type: str = "4", src6_addr: str = "", dst6_addr: str = "", src6_mode: int = 0, dst6_mode: int = 0, src6_suffix: str = "", dst6_suffix: str = "", src6_mac: str = "", dst6_mac: str = "", timeout: int = 10 ): """Send ACL edit POST. Returns requests.Response and parsed JSON data. rule_id: The ID of the ACL rule to edit (required). Other parameters are optional and will update the rule if provided. """ cfg = load_config() base = cfg.get("base_url", "").rstrip("/") url = f"{base}/Action/call" data = { "func_name": "acl", "action": "edit", "param": { "id": rule_id, "protocol": protocol, "action": action, "dir": dir, "ctdir": ctdir, "iinterface": iinterface, "ointerface": ointerface, "src_addr": src_addr, "dst_addr": dst_addr, "src_port": src_port, "dst_port": dst_port, "comment": comment, "enabled": enabled, "week": week, "time": time, "ip_type": ip_type, "src6_addr": src6_addr, "dst6_addr": dst6_addr, "src6_mode": src6_mode, "dst6_mode": dst6_mode, "src6_suffix": src6_suffix, "dst6_suffix": dst6_suffix, "src6_mac": src6_mac, "dst6_mac": dst6_mac } } logger = get_logger("edit_acl_post") # Get sess_key from global state sess_key = get_sess_key() if not sess_key: raise ValueError("未找到 sess_key,请先登录") cookies = {"sess_key": sess_key.split("=")[1].rstrip(";")} # Extract value from "sess_key=value;" logger.debug(f"准备发送请求,URL: {url}") resp = requests.post(url, json=data, cookies=cookies, timeout=timeout) # 记录请求/响应 try: log_request(logger, "edit_acl_rule", url, data, resp) except Exception: logger.exception("记录请求/响应失败") # Parse JSON response try: json_data = resp.json() except ValueError: json_data = None return resp, json_data def main(): logger = get_logger("main") try: # Example: Edit ACL rule with ID 14, change dst_addr to "1.2.3.4" and comment to "remark_kkkkkk" resp, data = edit_acl_rule(rule_id=14, dst_addr="1.2.3.4", comment="remark_kkkkkk") except FileNotFoundError as e: logger.error(f"配置错误: {e}") sys.exit(2) except ValueError as e: logger.error(f"会话错误: {e}") sys.exit(3) except requests.RequestException as e: logger.error(f"请求失败: {e}") sys.exit(1) # 控制台友好输出 logger.info(f"状态: {resp.status_code}") if data: try: pretty = json.dumps(data, ensure_ascii=False, indent=2) logger.info(f"响应 JSON:\n{pretty}") print(pretty) except Exception: logger.info(f"响应文本: {resp.text}") print(resp.text) else: logger.info(f"响应文本: {resp.text}") print(resp.text) if __name__ == "__main__": main()