import json import sys from pathlib import Path from typing import Optional, Tuple import requests from log_util import get_logger, log_request from auth_session import get_sess_key, get_base_url ROOT = Path(__file__).resolve().parent CONFIG_PATH = ROOT / "config.json" DEFAULT_SHOW_PAYLOAD = { "func_name": "acl", "action": "show", "param": { "TYPE": "total,data", "limit": "0,20", "ORDER_BY": "", "ORDER": "" } } def _make_cookies_from_sess_key(sess_key: str) -> dict: # sess_key may be stored as value (no "sess_key=...;") or as "sess_key=...;". val = sess_key.split("=", 1)[-1].rstrip(";") return {"sess_key": val} def get_acl_rules(payload: Optional[dict] = None, timeout: int = 10) -> Tuple[requests.Response, Optional[dict]]: base = get_base_url() url = f"{base}/Action/call" data = payload or DEFAULT_SHOW_PAYLOAD logger = get_logger("acl_post") sess_key = get_sess_key() if not sess_key: raise ValueError("未找到 sess_key,请先登录") cookies = _make_cookies_from_sess_key(sess_key) logger.debug(f"准备发送请求,URL: {url}") resp = requests.post(url, json=data, cookies=cookies, timeout=timeout) try: log_request(logger, "get_acl_rules", url, data, resp) except Exception: logger.exception("记录请求/响应失败") try: json_data = resp.json() except ValueError: json_data = None return resp, json_data def add_acl_rule( dst_addr: str, comment: str, protocol: str = "any", action: str = "drop", dir: str = "forward", ctdir: str = "0", iinterface: str = "any", ointerface: str = "any", src_addr: str = "", src_port: str = "", dst_port: str = "", enabled: str = "yes", week: str = "1234567", time: str = "00:00-23:59", ip_type: str = "4", src6_addr: str = "", dst6_addr: str = "", src6_mode: str = "", dst6_mode: str = "", src6_suffix: str = "", dst6_suffix: str = "", timeout: int = 10, ) -> Tuple[requests.Response, Optional[dict]]: base = get_base_url() url = f"{base}/Action/call" data = { "func_name": "acl", "action": "add", "param": { "protocol": protocol, "action": action, "dir": dir, "ctdir": ctdir, "iinterface": iinterface, "ointerface": ointerface, "src_addr": src_addr, "dst_addr": dst_addr, "src_port": src_port, "dst_port": dst_port, "comment": comment, "enabled": enabled, "week": week, "time": time, "ip_type": ip_type, "src6_addr": src6_addr, "dst6_addr": dst6_addr, "src6_mode": src6_mode, "dst6_mode": dst6_mode, "src6_suffix": src6_suffix, "dst6_suffix": dst6_suffix, }, } logger = get_logger("add_acl_post") sess_key = get_sess_key() if not sess_key: raise ValueError("未找到 sess_key,请先登录") cookies = _make_cookies_from_sess_key(sess_key) logger.debug(f"准备发送请求,URL: {url}") resp = requests.post(url, json=data, cookies=cookies, timeout=timeout) try: log_request(logger, "add_acl_rule", url, data, resp) except Exception: logger.exception("记录请求/响应失败") try: json_data = resp.json() except ValueError: json_data = None return resp, json_data def del_acl_rule(rule_id: int, timeout: int = 10) -> Tuple[requests.Response, Optional[dict]]: base = get_base_url() url = f"{base}/Action/call" data = {"func_name": "acl", "action": "del", "param": {"id": rule_id}} logger = get_logger("del_acl_post") sess_key = get_sess_key() if not sess_key: raise ValueError("未找到 sess_key,请先登录") cookies = _make_cookies_from_sess_key(sess_key) logger.debug(f"准备发送请求,URL: {url}") resp = requests.post(url, json=data, cookies=cookies, timeout=timeout) try: log_request(logger, "del_acl_rule", url, data, resp) except Exception: logger.exception("记录请求/响应失败") try: json_data = resp.json() except ValueError: json_data = None return resp, json_data def edit_acl_rule( rule_id: int, dst_addr: str = "", comment: str = "", protocol: str = "any", action: str = "drop", dir: str = "forward", ctdir: int = 0, iinterface: str = "any", ointerface: str = "any", src_addr: str = "", src_port: str = "", dst_port: str = "", enabled: str = "yes", week: str = "1234567", time: str = "00:00-23:59", ip_type: str = "4", src6_addr: str = "", dst6_addr: str = "", src6_mode: int = 0, dst6_mode: int = 0, src6_suffix: str = "", dst6_suffix: str = "", src6_mac: str = "", dst6_mac: str = "", timeout: int = 10, ) -> Tuple[requests.Response, Optional[dict]]: base = get_base_url() url = f"{base}/Action/call" data = { "func_name": "acl", "action": "edit", "param": { "id": rule_id, "protocol": protocol, "action": action, "dir": dir, "ctdir": ctdir, "iinterface": iinterface, "ointerface": ointerface, "src_addr": src_addr, "dst_addr": dst_addr, "src_port": src_port, "dst_port": dst_port, "comment": comment, "enabled": enabled, "week": week, "time": time, "ip_type": ip_type, "src6_addr": src6_addr, "dst6_addr": dst6_addr, "src6_mode": src6_mode, "dst6_mode": dst6_mode, "src6_suffix": src6_suffix, "dst6_suffix": dst6_suffix, "src6_mac": src6_mac, "dst6_mac": dst6_mac, }, } logger = get_logger("edit_acl_post") sess_key = get_sess_key() if not sess_key: raise ValueError("未找到 sess_key,请先登录") cookies = _make_cookies_from_sess_key(sess_key) logger.debug(f"准备发送请求,URL: {url}") resp = requests.post(url, json=data, cookies=cookies, timeout=timeout) try: log_request(logger, "edit_acl_rule", url, data, resp) except Exception: logger.exception("记录请求/响应失败") try: json_data = resp.json() except ValueError: json_data = None return resp, json_data def _print_response(resp: requests.Response, data: Optional[dict], logger_name: str = "main") -> None: logger = get_logger(logger_name) logger.info(f"状态: {resp.status_code}") if data: try: pretty = json.dumps(data, ensure_ascii=False, indent=2) logger.info(f"响应 JSON:\n{pretty}") print(pretty) except Exception: logger.info(f"响应文本: {resp.text}") print(resp.text) else: logger.info(f"响应文本: {resp.text}") print(resp.text) if __name__ == "__main__": # 示例:调用 get_acl_rules try: resp, data = get_acl_rules() _print_response(resp, data) except FileNotFoundError as e: get_logger("main").error(f"配置错误: {e}") sys.exit(2) except ValueError as e: get_logger("main").error(f"会话错误: {e}") sys.exit(3) except requests.RequestException as e: get_logger("main").error(f"请求失败: {e}") sys.exit(1)