Przeglądaj źródła

feat(acl): 重构ACL规则接口调用逻辑

- 统一使用 get_base_url() 获取基础URL,避免重复加载配置
- 移除冗余的 load_config() 调用,提高代码复用性
- 新增 analysis_connections.py 模块用于连接监控
- 实现 monitor_lanip 接口以分析局域网IP连接情况
- 在 main.py 中添加 monitor_lanip 接口调用示例
- 增强 auth_session.py 提供 get_host() 与 get_base_url() 工具函数
mcbaiyun 2 miesięcy temu
rodzic
commit
c7eedcd246
4 zmienionych plików z 152 dodań i 9 usunięć
  1. 5 9
      acl_actions.py
  2. 112 0
      analysis_connections.py
  3. 20 0
      auth_session.py
  4. 15 0
      main.py

+ 5 - 9
acl_actions.py

@@ -6,7 +6,7 @@ from typing import Optional, Tuple
 import requests
 
 from log_util import get_logger, log_request
-from auth_session import get_sess_key, load_config
+from auth_session import get_sess_key, get_base_url
 
 
 ROOT = Path(__file__).resolve().parent
@@ -32,8 +32,7 @@ def _make_cookies_from_sess_key(sess_key: str) -> dict:
 
 
 def get_acl_rules(payload: dict | None = None, timeout: int = 10) -> Tuple[requests.Response, Optional[dict]]:
-    cfg = load_config()
-    base = cfg.get("base_url", "").rstrip("/")
+    base = get_base_url()
     url = f"{base}/Action/call"
     data = payload or DEFAULT_SHOW_PAYLOAD
     logger = get_logger("acl_post")
@@ -84,8 +83,7 @@ def add_acl_rule(
     dst6_suffix: str = "",
     timeout: int = 10,
 ) -> Tuple[requests.Response, Optional[dict]]:
-    cfg = load_config()
-    base = cfg.get("base_url", "").rstrip("/")
+    base = get_base_url()
     url = f"{base}/Action/call"
     data = {
         "func_name": "acl",
@@ -139,8 +137,7 @@ def add_acl_rule(
 
 
 def del_acl_rule(rule_id: int, timeout: int = 10) -> Tuple[requests.Response, Optional[dict]]:
-    cfg = load_config()
-    base = cfg.get("base_url", "").rstrip("/")
+    base = get_base_url()
     url = f"{base}/Action/call"
     data = {"func_name": "acl", "action": "del", "param": {"id": rule_id}}
     logger = get_logger("del_acl_post")
@@ -194,8 +191,7 @@ def edit_acl_rule(
     dst6_mac: str = "",
     timeout: int = 10,
 ) -> Tuple[requests.Response, Optional[dict]]:
-    cfg = load_config()
-    base = cfg.get("base_url", "").rstrip("/")
+    base = get_base_url()
     url = f"{base}/Action/call"
     data = {
         "func_name": "acl",

+ 112 - 0
analysis_connections.py

@@ -0,0 +1,112 @@
+
+import json
+import sys
+from pathlib import Path
+from typing import Optional, Tuple
+
+import requests
+
+from log_util import get_logger, log_request
+from auth_session import load_config, get_sess_key, get_base_url, get_host
+
+
+ROOT = Path(__file__).resolve().parent
+CONFIG_PATH = ROOT / "config.json"
+
+
+def monitor_lanip(ip: str, interface: str = "all", proto: str = "all", maxnum: int = 500, limit: str = "0,20", timeout: int = 10) -> Tuple[requests.Response, Optional[dict]]:
+	"""Call the monitor_lanip action and return (resp, json_data).
+
+	Example payload:
+	{"func_name":"monitor_lanip","action":"show","param":{"TYPE":"conn,conn_num","ip":"10.8.7.2","interface":"all","proto":"all","maxnum":500,"limit":"0,20","ORDER_BY":"","ORDER":""}}
+	"""
+	cfg = load_config()
+	base = cfg.get("base_url", "").rstrip("/")
+	url = f"{base}/Action/call"
+
+	data = {
+		"func_name": "monitor_lanip",
+		"action": "show",
+		"param": {
+			"TYPE": "conn,conn_num",
+			"ip": ip,
+			"interface": interface,
+			"proto": proto,
+			"maxnum": maxnum,
+			"limit": limit,
+			"ORDER_BY": "",
+			"ORDER": "",
+		},
+	}
+
+	logger = get_logger("analysis_connections")
+
+	sess_key = get_sess_key()
+	if not sess_key:
+		raise ValueError("未找到 sess_key,请先登录")
+
+	cookies = {"sess_key": sess_key.split("=", 1)[-1].rstrip(";")}
+
+	# Prepare headers including Origin/Referer/Host derived from config
+	base = get_base_url()
+	host = get_host()
+	headers = {
+		"Origin": base + "/",
+		"Referer": base + "/",
+		"Host": host,
+		"User-Agent": "python-requests/advanced-client",
+		"Accept": "application/json, text/plain, */*",
+		"Content-Type": "application/json;charset=UTF-8",
+	}
+
+	logger.debug(f"准备发送请求,URL: {url} payload: {data} headers: {headers}")
+	resp = requests.post(url, json=data, cookies=cookies, headers=headers, timeout=timeout)
+
+	try:
+		log_request(logger, "monitor_lanip", url, data, resp)
+	except Exception:
+		logger.exception("记录请求/响应失败")
+
+	try:
+		json_data = resp.json()
+	except ValueError:
+		json_data = None
+
+	return resp, json_data
+
+
+def main():
+	logger = get_logger("main")
+	if len(sys.argv) < 2:
+		print("用法: python analysis_connections.py <ip>")
+		sys.exit(2)
+	ip = sys.argv[1]
+	try:
+		resp, data = monitor_lanip(ip)
+	except FileNotFoundError as e:
+		logger.error(f"配置错误: {e}")
+		sys.exit(2)
+	except ValueError as e:
+		logger.error(f"会话错误: {e}")
+		sys.exit(3)
+	except requests.RequestException as e:
+		logger.error(f"请求失败: {e}")
+		sys.exit(1)
+
+	logger.info(f"状态: {resp.status_code}")
+	if data:
+		try:
+			pretty = json.dumps(data, ensure_ascii=False, indent=2)
+			logger.info(f"响应 JSON:\n{pretty}")
+			print(pretty)
+		except Exception:
+			logger.info(f"响应文本: {resp.text}")
+			print(resp.text)
+	else:
+		logger.info(f"响应文本: {resp.text}")
+		print(resp.text)
+
+
+if __name__ == "__main__":
+	main()
+

+ 20 - 0
auth_session.py

@@ -63,6 +63,26 @@ def load_config() -> dict:
         return json.load(f)
 
 
+def get_base_url() -> str:
+    """Return base_url from config (ensures no trailing slash)."""
+    cfg = load_config()
+    return cfg.get("base_url", "").rstrip("/")
+
+
+def get_host() -> str:
+    """Return host:port (netloc) parsed from base_url.
+
+    Example: 'http://8.210.76.176:65531/' -> '8.210.76.176:65531'
+    """
+    from urllib.parse import urlparse
+
+    base = get_base_url()
+    if not base:
+        return ""
+    parsed = urlparse(base)
+    return parsed.netloc
+
+
 DEFAULT_PAYLOAD = {
     "username": "xiaobai",
     "passwd": "dc81b4427df07fd6b3ebcb05a7b34daf",

+ 15 - 0
main.py

@@ -3,7 +3,9 @@
 from auth_session import login, set_sess_key
 from log_util import get_logger
 from acl_actions import get_acl_rules, add_acl_rule, edit_acl_rule, del_acl_rule
+import json
 from advanced_acl import get_all_test_ips, add_ip, del_ip
+from analysis_connections import monitor_lanip
 
 
 def main():
@@ -131,6 +133,19 @@ def main():
 		# except Exception as e:
 		# 	logger.exception(f"测试删除ACL规则失败: {e}")
 		# 	print(f"测试删除ACL规则失败: {e}")
+
+		# 6) 演示 monitor_lanip 接口
+		try:
+			conn_ip = "10.8.7.2"
+			resp_conn, data_conn = monitor_lanip(conn_ip)
+			logger.info(f"已调用: monitor_lanip({conn_ip}) | 状态: {resp_conn.status_code}")
+			if data_conn:
+				print(json.dumps(data_conn, ensure_ascii=False, indent=2))
+			else:
+				print(resp_conn.text)
+		except Exception as e:
+			logger.exception(f"测试 monitor_lanip 失败: {e}")
+			print(f"测试 monitor_lanip 失败: {e}")
 	else:
 		logger.warning("未在响应中找到 sess_key")
 		print("未在响应中找到 sess_key")