Просмотр исходного кода

feat(acl): 集成ACL规则管理功能

- 添加ACL规则的获取、新增、编辑和删除接口调用
- 实现ACL接口调用的日志记录和状态打印
- 在主流程中集成ACL功能测试逻辑
- 引入相关ACL操作模块依赖
- 增加异常处理以提升接口调用稳定性
- 输出ACL规则总数及操作结果信息
mcbaiyun 2 месяцев назад
Родитель
Сommit
fff85062b4
5 измененных файлов с 545 добавлено и 0 удалено
  1. 101 0
      acl_post.py
  2. 141 0
      add_acl_post.py
  3. 96 0
      del_acl_post.py
  4. 146 0
      edit_acl_post.py
  5. 61 0
      main.py

+ 101 - 0
acl_post.py

@@ -0,0 +1,101 @@
+import json
+import sys
+from pathlib import Path
+
+import requests
+
+from log_util import get_logger, log_request
+from session_state import get_sess_key
+
+
+ROOT = Path(__file__).resolve().parent
+CONFIG_PATH = ROOT / "config.json"
+
+
+def load_config():
+    if not CONFIG_PATH.exists():
+        raise FileNotFoundError(f"配置文件未找到: {CONFIG_PATH}")
+    with open(CONFIG_PATH, "r", encoding="utf-8") as f:
+        return json.load(f)
+
+
+DEFAULT_PAYLOAD = {
+    "func_name": "acl",
+    "action": "show",
+    "param": {
+        "TYPE": "total,data",
+        "limit": "0,20",
+        "ORDER_BY": "",
+        "ORDER": ""
+    }
+}
+
+
+def get_acl_rules(payload: dict | None = None, timeout: int = 10):
+    """Send ACL show POST. Returns requests.Response and parsed JSON data.
+
+    payload: JSON payload to send. If None, uses DEFAULT_PAYLOAD.
+    timeout: request timeout in seconds.
+    """
+    cfg = load_config()
+    base = cfg.get("base_url", "").rstrip("/")
+    url = f"{base}/Action/call"
+    data = payload or DEFAULT_PAYLOAD
+    logger = get_logger("acl_post")
+
+    # Get sess_key from global state
+    sess_key = get_sess_key()
+    if not sess_key:
+        raise ValueError("未找到 sess_key,请先登录")
+
+    cookies = {"sess_key": sess_key.split("=")[1].rstrip(";")}  # Extract value from "sess_key=value;"
+
+    logger.debug(f"准备发送请求,URL: {url}")
+    resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
+
+    # 记录请求/响应
+    try:
+        log_request(logger, "get_acl_rules", url, data, resp)
+    except Exception:
+        logger.exception("记录请求/响应失败")
+
+    # Parse JSON response
+    try:
+        json_data = resp.json()
+    except ValueError:
+        json_data = None
+
+    return resp, json_data
+
+
+def main():
+    logger = get_logger("main")
+    try:
+        resp, data = get_acl_rules()
+    except FileNotFoundError as e:
+        logger.error(f"配置错误: {e}")
+        sys.exit(2)
+    except ValueError as e:
+        logger.error(f"会话错误: {e}")
+        sys.exit(3)
+    except requests.RequestException as e:
+        logger.error(f"请求失败: {e}")
+        sys.exit(1)
+
+    # 控制台友好输出
+    logger.info(f"状态: {resp.status_code}")
+    if data:
+        try:
+            pretty = json.dumps(data, ensure_ascii=False, indent=2)
+            logger.info(f"响应 JSON:\n{pretty}")
+            print(pretty)
+        except Exception:
+            logger.info(f"响应文本: {resp.text}")
+            print(resp.text)
+    else:
+        logger.info(f"响应文本: {resp.text}")
+        print(resp.text)
+
+
+if __name__ == "__main__":
+    main()

+ 141 - 0
add_acl_post.py

@@ -0,0 +1,141 @@
+import json
+import sys
+from pathlib import Path
+from typing import Optional
+
+import requests
+
+from log_util import get_logger, log_request
+from session_state import get_sess_key
+
+
+ROOT = Path(__file__).resolve().parent
+CONFIG_PATH = ROOT / "config.json"
+
+
+def load_config():
+    if not CONFIG_PATH.exists():
+        raise FileNotFoundError(f"配置文件未找到: {CONFIG_PATH}")
+    with open(CONFIG_PATH, "r", encoding="utf-8") as f:
+        return json.load(f)
+
+
+def add_acl_rule(
+    dst_addr: str,
+    comment: str,
+    protocol: str = "any",
+    action: str = "drop",
+    dir: str = "forward",
+    ctdir: str = "0",
+    iinterface: str = "any",
+    ointerface: str = "any",
+    src_addr: str = "",
+    src_port: str = "",
+    dst_port: str = "",
+    enabled: str = "yes",
+    week: str = "1234567",
+    time: str = "00:00-23:59",
+    ip_type: str = "4",
+    src6_addr: str = "",
+    dst6_addr: str = "",
+    src6_mode: str = "",
+    dst6_mode: str = "",
+    src6_suffix: str = "",
+    dst6_suffix: str = "",
+    timeout: int = 10
+):
+    """Send ACL add POST. Returns requests.Response and parsed JSON data.
+
+    dst_addr: Comma-separated destination addresses, e.g., "1.2.3.4,2.4.5.6"
+    comment: Comment for the rule, e.g., "remark_kkkkkk"
+    Other parameters have defaults based on the example.
+    """
+    cfg = load_config()
+    base = cfg.get("base_url", "").rstrip("/")
+    url = f"{base}/Action/call"
+    data = {
+        "func_name": "acl",
+        "action": "add",
+        "param": {
+            "protocol": protocol,
+            "action": action,
+            "dir": dir,
+            "ctdir": ctdir,
+            "iinterface": iinterface,
+            "ointerface": ointerface,
+            "src_addr": src_addr,
+            "dst_addr": dst_addr,
+            "src_port": src_port,
+            "dst_port": dst_port,
+            "comment": comment,
+            "enabled": enabled,
+            "week": week,
+            "time": time,
+            "ip_type": ip_type,
+            "src6_addr": src6_addr,
+            "dst6_addr": dst6_addr,
+            "src6_mode": src6_mode,
+            "dst6_mode": dst6_mode,
+            "src6_suffix": src6_suffix,
+            "dst6_suffix": dst6_suffix
+        }
+    }
+    logger = get_logger("add_acl_post")
+
+    # Get sess_key from global state
+    sess_key = get_sess_key()
+    if not sess_key:
+        raise ValueError("未找到 sess_key,请先登录")
+
+    cookies = {"sess_key": sess_key.split("=")[1].rstrip(";")}  # Extract value from "sess_key=value;"
+
+    logger.debug(f"准备发送请求,URL: {url}")
+    resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
+
+    # 记录请求/响应
+    try:
+        log_request(logger, "add_acl_rule", url, data, resp)
+    except Exception:
+        logger.exception("记录请求/响应失败")
+
+    # Parse JSON response
+    try:
+        json_data = resp.json()
+    except ValueError:
+        json_data = None
+
+    return resp, json_data
+
+
+def main():
+    logger = get_logger("main")
+    try:
+        # Example: Add ACL rule to drop traffic to 1.2.3.4 and 2.4.5.6 with comment "remark_kkkkkk"
+        resp, data = add_acl_rule(dst_addr="1.2.3.4,2.4.5.6", comment="remark_kkkkkk")
+    except FileNotFoundError as e:
+        logger.error(f"配置错误: {e}")
+        sys.exit(2)
+    except ValueError as e:
+        logger.error(f"会话错误: {e}")
+        sys.exit(3)
+    except requests.RequestException as e:
+        logger.error(f"请求失败: {e}")
+        sys.exit(1)
+
+    # 控制台友好输出
+    logger.info(f"状态: {resp.status_code}")
+    if data:
+        try:
+            pretty = json.dumps(data, ensure_ascii=False, indent=2)
+            logger.info(f"响应 JSON:\n{pretty}")
+            print(pretty)
+        except Exception:
+            logger.info(f"响应文本: {resp.text}")
+            print(resp.text)
+    else:
+        logger.info(f"响应文本: {resp.text}")
+        print(resp.text)
+
+
+if __name__ == "__main__":
+    main()

+ 96 - 0
del_acl_post.py

@@ -0,0 +1,96 @@
+import json
+import sys
+from pathlib import Path
+from typing import Optional
+
+import requests
+
+from log_util import get_logger, log_request
+from session_state import get_sess_key
+
+
+ROOT = Path(__file__).resolve().parent
+CONFIG_PATH = ROOT / "config.json"
+
+
+def load_config():
+    if not CONFIG_PATH.exists():
+        raise FileNotFoundError(f"配置文件未找到: {CONFIG_PATH}")
+    with open(CONFIG_PATH, "r", encoding="utf-8") as f:
+        return json.load(f)
+
+
+def del_acl_rule(rule_id: int, timeout: int = 10):
+    """Send ACL delete POST. Returns requests.Response and parsed JSON data.
+
+    rule_id: The ID of the ACL rule to delete (required).
+    """
+    cfg = load_config()
+    base = cfg.get("base_url", "").rstrip("/")
+    url = f"{base}/Action/call"
+    data = {
+        "func_name": "acl",
+        "action": "del",
+        "param": {
+            "id": rule_id
+        }
+    }
+    logger = get_logger("del_acl_post")
+
+    # Get sess_key from global state
+    sess_key = get_sess_key()
+    if not sess_key:
+        raise ValueError("未找到 sess_key,请先登录")
+
+    cookies = {"sess_key": sess_key.split("=")[1].rstrip(";")}  # Extract value from "sess_key=value;"
+
+    logger.debug(f"准备发送请求,URL: {url}")
+    resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
+
+    # 记录请求/响应
+    try:
+        log_request(logger, "del_acl_rule", url, data, resp)
+    except Exception:
+        logger.exception("记录请求/响应失败")
+
+    # Parse JSON response
+    try:
+        json_data = resp.json()
+    except ValueError:
+        json_data = None
+
+    return resp, json_data
+
+
+def main():
+    logger = get_logger("main")
+    try:
+        # Example: Delete ACL rule with ID 14
+        resp, data = del_acl_rule(rule_id=14)
+    except FileNotFoundError as e:
+        logger.error(f"配置错误: {e}")
+        sys.exit(2)
+    except ValueError as e:
+        logger.error(f"会话错误: {e}")
+        sys.exit(3)
+    except requests.RequestException as e:
+        logger.error(f"请求失败: {e}")
+        sys.exit(1)
+
+    # 控制台友好输出
+    logger.info(f"状态: {resp.status_code}")
+    if data:
+        try:
+            pretty = json.dumps(data, ensure_ascii=False, indent=2)
+            logger.info(f"响应 JSON:\n{pretty}")
+            print(pretty)
+        except Exception:
+            logger.info(f"响应文本: {resp.text}")
+            print(resp.text)
+    else:
+        logger.info(f"响应文本: {resp.text}")
+        print(resp.text)
+
+
+if __name__ == "__main__":
+    main()

+ 146 - 0
edit_acl_post.py

@@ -0,0 +1,146 @@
+import json
+import sys
+from pathlib import Path
+from typing import Optional
+
+import requests
+
+from log_util import get_logger, log_request
+from session_state import get_sess_key
+
+
+ROOT = Path(__file__).resolve().parent
+CONFIG_PATH = ROOT / "config.json"
+
+
+def load_config():
+    if not CONFIG_PATH.exists():
+        raise FileNotFoundError(f"配置文件未找到: {CONFIG_PATH}")
+    with open(CONFIG_PATH, "r", encoding="utf-8") as f:
+        return json.load(f)
+
+
+def edit_acl_rule(
+    rule_id: int,
+    dst_addr: str = "",
+    comment: str = "",
+    protocol: str = "any",
+    action: str = "drop",
+    dir: str = "forward",
+    ctdir: int = 0,
+    iinterface: str = "any",
+    ointerface: str = "any",
+    src_addr: str = "",
+    src_port: str = "",
+    dst_port: str = "",
+    enabled: str = "yes",
+    week: str = "1234567",
+    time: str = "00:00-23:59",
+    ip_type: str = "4",
+    src6_addr: str = "",
+    dst6_addr: str = "",
+    src6_mode: int = 0,
+    dst6_mode: int = 0,
+    src6_suffix: str = "",
+    dst6_suffix: str = "",
+    src6_mac: str = "",
+    dst6_mac: str = "",
+    timeout: int = 10
+):
+    """Send ACL edit POST. Returns requests.Response and parsed JSON data.
+
+    rule_id: The ID of the ACL rule to edit (required).
+    Other parameters are optional and will update the rule if provided.
+    """
+    cfg = load_config()
+    base = cfg.get("base_url", "").rstrip("/")
+    url = f"{base}/Action/call"
+    data = {
+        "func_name": "acl",
+        "action": "edit",
+        "param": {
+            "id": rule_id,
+            "protocol": protocol,
+            "action": action,
+            "dir": dir,
+            "ctdir": ctdir,
+            "iinterface": iinterface,
+            "ointerface": ointerface,
+            "src_addr": src_addr,
+            "dst_addr": dst_addr,
+            "src_port": src_port,
+            "dst_port": dst_port,
+            "comment": comment,
+            "enabled": enabled,
+            "week": week,
+            "time": time,
+            "ip_type": ip_type,
+            "src6_addr": src6_addr,
+            "dst6_addr": dst6_addr,
+            "src6_mode": src6_mode,
+            "dst6_mode": dst6_mode,
+            "src6_suffix": src6_suffix,
+            "dst6_suffix": dst6_suffix,
+            "src6_mac": src6_mac,
+            "dst6_mac": dst6_mac
+        }
+    }
+    logger = get_logger("edit_acl_post")
+
+    # Get sess_key from global state
+    sess_key = get_sess_key()
+    if not sess_key:
+        raise ValueError("未找到 sess_key,请先登录")
+
+    cookies = {"sess_key": sess_key.split("=")[1].rstrip(";")}  # Extract value from "sess_key=value;"
+
+    logger.debug(f"准备发送请求,URL: {url}")
+    resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
+
+    # 记录请求/响应
+    try:
+        log_request(logger, "edit_acl_rule", url, data, resp)
+    except Exception:
+        logger.exception("记录请求/响应失败")
+
+    # Parse JSON response
+    try:
+        json_data = resp.json()
+    except ValueError:
+        json_data = None
+
+    return resp, json_data
+
+
+def main():
+    logger = get_logger("main")
+    try:
+        # Example: Edit ACL rule with ID 14, change dst_addr to "1.2.3.4" and comment to "remark_kkkkkk"
+        resp, data = edit_acl_rule(rule_id=14, dst_addr="1.2.3.4", comment="remark_kkkkkk")
+    except FileNotFoundError as e:
+        logger.error(f"配置错误: {e}")
+        sys.exit(2)
+    except ValueError as e:
+        logger.error(f"会话错误: {e}")
+        sys.exit(3)
+    except requests.RequestException as e:
+        logger.error(f"请求失败: {e}")
+        sys.exit(1)
+
+    # 控制台友好输出
+    logger.info(f"状态: {resp.status_code}")
+    if data:
+        try:
+            pretty = json.dumps(data, ensure_ascii=False, indent=2)
+            logger.info(f"响应 JSON:\n{pretty}")
+            print(pretty)
+        except Exception:
+            logger.info(f"响应文本: {resp.text}")
+            print(resp.text)
+    else:
+        logger.info(f"响应文本: {resp.text}")
+        print(resp.text)
+
+
+if __name__ == "__main__":
+    main()

+ 61 - 0
main.py

@@ -3,6 +3,10 @@
 from login_post import login
 from login_post import login
 from log_util import get_logger
 from log_util import get_logger
 from session_state import set_sess_key
 from session_state import set_sess_key
+from acl_post import get_acl_rules
+from add_acl_post import add_acl_rule
+from edit_acl_post import edit_acl_rule
+from del_acl_post import del_acl_rule
 
 
 
 
 def main():
 def main():
@@ -24,6 +28,63 @@ def main():
 		# 保存为全局会话状态,供其他接口调用时使用
 		# 保存为全局会话状态,供其他接口调用时使用
 		set_sess_key(sess_cookie)
 		set_sess_key(sess_cookie)
 		print(f"sess_key: {sess_cookie}")
 		print(f"sess_key: {sess_cookie}")
+
+		# 测试ACL接口
+		try:
+			acl_resp, acl_data = get_acl_rules()
+			logger.info(f"已调用: get_acl_rules() | 状态: {acl_resp.status_code}")
+			if acl_data:
+				logger.info(f"ACL规则总数: {acl_data.get('Data', {}).get('total', '未知')}")
+				print(f"ACL规则总数: {acl_data.get('Data', {}).get('total', '未知')}")
+			else:
+				logger.warning("未获取到ACL数据")
+				print("未获取到ACL数据")
+		except Exception as e:
+			logger.exception(f"测试ACL接口失败: {e}")
+			print(f"测试ACL接口失败: {e}")
+
+		# 测试添加ACL规则
+		try:
+			add_resp, add_data = add_acl_rule(dst_addr="1.2.3.4,2.4.5.6", comment="remark_kkkkkk")
+			logger.info(f"已调用: add_acl_rule() | 状态: {add_resp.status_code}")
+			if add_data:
+				row_id = add_data.get('RowId')
+				logger.info(f"新增ACL规则ID: {row_id}")
+				print(f"新增ACL规则ID: {row_id}")
+			else:
+				logger.warning("未获取到添加结果")
+				print("未获取到添加结果")
+		except Exception as e:
+			logger.exception(f"测试添加ACL规则失败: {e}")
+			print(f"测试添加ACL规则失败: {e}")
+
+		# 测试编辑ACL规则
+		try:
+			edit_resp, edit_data = edit_acl_rule(rule_id=14, dst_addr="1.2.3.4", comment="remark_kkkkkk")
+			logger.info(f"已调用: edit_acl_rule() | 状态: {edit_resp.status_code}")
+			if edit_data:
+				logger.info("ACL规则编辑成功")
+				print("ACL规则编辑成功")
+			else:
+				logger.warning("未获取到编辑结果")
+				print("未获取到编辑结果")
+		except Exception as e:
+			logger.exception(f"测试编辑ACL规则失败: {e}")
+			print(f"测试编辑ACL规则失败: {e}")
+
+		# 测试删除ACL规则
+		try:
+			del_resp, del_data = del_acl_rule(rule_id=14)
+			logger.info(f"已调用: del_acl_rule() | 状态: {del_resp.status_code}")
+			if del_data:
+				logger.info("ACL规则删除成功")
+				print("ACL规则删除成功")
+			else:
+				logger.warning("未获取到删除结果")
+				print("未获取到删除结果")
+		except Exception as e:
+			logger.exception(f"测试删除ACL规则失败: {e}")
+			print(f"测试删除ACL规则失败: {e}")
 	else:
 	else:
 		logger.warning("未在响应中找到 sess_key")
 		logger.warning("未在响应中找到 sess_key")
 		print("未在响应中找到 sess_key")
 		print("未在响应中找到 sess_key")