Kaynağa Gözat

refactor(api): 重构ACL相关API调用模块

- 移除独立的ACL操作文件(acl_post.py、add_acl_post.py等)
- 合并ACL功能至统一模块acl_actions.py中
- 更新main.py导入路径以适应新的模块结构
- 移除session_state.py,其功能由auth_session.py接管
- 统一认证和会话管理逻辑,提升代码复用性与可维护性
mcbaiyun 2 ay önce
ebeveyn
işleme
1c4be2fe39
9 değiştirilmiş dosya ile 440 ekleme ve 622 silme
  1. 283 0
      acl_actions.py
  2. 0 101
      acl_post.py
  3. 0 141
      add_acl_post.py
  4. 155 0
      auth_session.py
  5. 0 96
      del_acl_post.py
  6. 0 146
      edit_acl_post.py
  7. 0 103
      login_post.py
  8. 2 6
      main.py
  9. 0 29
      session_state.py

+ 283 - 0
acl_actions.py

@@ -0,0 +1,283 @@
+import json
+import sys
+from pathlib import Path
+from typing import Optional, Tuple
+
+import requests
+
+from log_util import get_logger, log_request
+from auth_session import get_sess_key, load_config
+
+
+ROOT = Path(__file__).resolve().parent
+CONFIG_PATH = ROOT / "config.json"
+
+
+DEFAULT_SHOW_PAYLOAD = {
+    "func_name": "acl",
+    "action": "show",
+    "param": {
+        "TYPE": "total,data",
+        "limit": "0,20",
+        "ORDER_BY": "",
+        "ORDER": ""
+    }
+}
+
+
+def _make_cookies_from_sess_key(sess_key: str) -> dict:
+    # sess_key may be stored as value (no "sess_key=...;") or as "sess_key=...;".
+    val = sess_key.split("=", 1)[-1].rstrip(";")
+    return {"sess_key": val}
+
+
+def get_acl_rules(payload: dict | None = None, timeout: int = 10) -> Tuple[requests.Response, Optional[dict]]:
+    cfg = load_config()
+    base = cfg.get("base_url", "").rstrip("/")
+    url = f"{base}/Action/call"
+    data = payload or DEFAULT_SHOW_PAYLOAD
+    logger = get_logger("acl_post")
+
+    sess_key = get_sess_key()
+    if not sess_key:
+        raise ValueError("未找到 sess_key,请先登录")
+
+    cookies = _make_cookies_from_sess_key(sess_key)
+
+    logger.debug(f"准备发送请求,URL: {url}")
+    resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
+
+    try:
+        log_request(logger, "get_acl_rules", url, data, resp)
+    except Exception:
+        logger.exception("记录请求/响应失败")
+
+    try:
+        json_data = resp.json()
+    except ValueError:
+        json_data = None
+
+    return resp, json_data
+
+
+def add_acl_rule(
+    dst_addr: str,
+    comment: str,
+    protocol: str = "any",
+    action: str = "drop",
+    dir: str = "forward",
+    ctdir: str = "0",
+    iinterface: str = "any",
+    ointerface: str = "any",
+    src_addr: str = "",
+    src_port: str = "",
+    dst_port: str = "",
+    enabled: str = "yes",
+    week: str = "1234567",
+    time: str = "00:00-23:59",
+    ip_type: str = "4",
+    src6_addr: str = "",
+    dst6_addr: str = "",
+    src6_mode: str = "",
+    dst6_mode: str = "",
+    src6_suffix: str = "",
+    dst6_suffix: str = "",
+    timeout: int = 10,
+) -> Tuple[requests.Response, Optional[dict]]:
+    cfg = load_config()
+    base = cfg.get("base_url", "").rstrip("/")
+    url = f"{base}/Action/call"
+    data = {
+        "func_name": "acl",
+        "action": "add",
+        "param": {
+            "protocol": protocol,
+            "action": action,
+            "dir": dir,
+            "ctdir": ctdir,
+            "iinterface": iinterface,
+            "ointerface": ointerface,
+            "src_addr": src_addr,
+            "dst_addr": dst_addr,
+            "src_port": src_port,
+            "dst_port": dst_port,
+            "comment": comment,
+            "enabled": enabled,
+            "week": week,
+            "time": time,
+            "ip_type": ip_type,
+            "src6_addr": src6_addr,
+            "dst6_addr": dst6_addr,
+            "src6_mode": src6_mode,
+            "dst6_mode": dst6_mode,
+            "src6_suffix": src6_suffix,
+            "dst6_suffix": dst6_suffix,
+        },
+    }
+    logger = get_logger("add_acl_post")
+
+    sess_key = get_sess_key()
+    if not sess_key:
+        raise ValueError("未找到 sess_key,请先登录")
+
+    cookies = _make_cookies_from_sess_key(sess_key)
+
+    logger.debug(f"准备发送请求,URL: {url}")
+    resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
+
+    try:
+        log_request(logger, "add_acl_rule", url, data, resp)
+    except Exception:
+        logger.exception("记录请求/响应失败")
+
+    try:
+        json_data = resp.json()
+    except ValueError:
+        json_data = None
+
+    return resp, json_data
+
+
+def del_acl_rule(rule_id: int, timeout: int = 10) -> Tuple[requests.Response, Optional[dict]]:
+    cfg = load_config()
+    base = cfg.get("base_url", "").rstrip("/")
+    url = f"{base}/Action/call"
+    data = {"func_name": "acl", "action": "del", "param": {"id": rule_id}}
+    logger = get_logger("del_acl_post")
+
+    sess_key = get_sess_key()
+    if not sess_key:
+        raise ValueError("未找到 sess_key,请先登录")
+
+    cookies = _make_cookies_from_sess_key(sess_key)
+
+    logger.debug(f"准备发送请求,URL: {url}")
+    resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
+
+    try:
+        log_request(logger, "del_acl_rule", url, data, resp)
+    except Exception:
+        logger.exception("记录请求/响应失败")
+
+    try:
+        json_data = resp.json()
+    except ValueError:
+        json_data = None
+
+    return resp, json_data
+
+
+def edit_acl_rule(
+    rule_id: int,
+    dst_addr: str = "",
+    comment: str = "",
+    protocol: str = "any",
+    action: str = "drop",
+    dir: str = "forward",
+    ctdir: int = 0,
+    iinterface: str = "any",
+    ointerface: str = "any",
+    src_addr: str = "",
+    src_port: str = "",
+    dst_port: str = "",
+    enabled: str = "yes",
+    week: str = "1234567",
+    time: str = "00:00-23:59",
+    ip_type: str = "4",
+    src6_addr: str = "",
+    dst6_addr: str = "",
+    src6_mode: int = 0,
+    dst6_mode: int = 0,
+    src6_suffix: str = "",
+    dst6_suffix: str = "",
+    src6_mac: str = "",
+    dst6_mac: str = "",
+    timeout: int = 10,
+) -> Tuple[requests.Response, Optional[dict]]:
+    cfg = load_config()
+    base = cfg.get("base_url", "").rstrip("/")
+    url = f"{base}/Action/call"
+    data = {
+        "func_name": "acl",
+        "action": "edit",
+        "param": {
+            "id": rule_id,
+            "protocol": protocol,
+            "action": action,
+            "dir": dir,
+            "ctdir": ctdir,
+            "iinterface": iinterface,
+            "ointerface": ointerface,
+            "src_addr": src_addr,
+            "dst_addr": dst_addr,
+            "src_port": src_port,
+            "dst_port": dst_port,
+            "comment": comment,
+            "enabled": enabled,
+            "week": week,
+            "time": time,
+            "ip_type": ip_type,
+            "src6_addr": src6_addr,
+            "dst6_addr": dst6_addr,
+            "src6_mode": src6_mode,
+            "dst6_mode": dst6_mode,
+            "src6_suffix": src6_suffix,
+            "dst6_suffix": dst6_suffix,
+            "src6_mac": src6_mac,
+            "dst6_mac": dst6_mac,
+        },
+    }
+    logger = get_logger("edit_acl_post")
+
+    sess_key = get_sess_key()
+    if not sess_key:
+        raise ValueError("未找到 sess_key,请先登录")
+
+    cookies = _make_cookies_from_sess_key(sess_key)
+
+    logger.debug(f"准备发送请求,URL: {url}")
+    resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
+
+    try:
+        log_request(logger, "edit_acl_rule", url, data, resp)
+    except Exception:
+        logger.exception("记录请求/响应失败")
+
+    try:
+        json_data = resp.json()
+    except ValueError:
+        json_data = None
+
+    return resp, json_data
+
+
+def _print_response(resp: requests.Response, data: Optional[dict], logger_name: str = "main") -> None:
+    logger = get_logger(logger_name)
+    logger.info(f"状态: {resp.status_code}")
+    if data:
+        try:
+            pretty = json.dumps(data, ensure_ascii=False, indent=2)
+            logger.info(f"响应 JSON:\n{pretty}")
+            print(pretty)
+        except Exception:
+            logger.info(f"响应文本: {resp.text}")
+            print(resp.text)
+    else:
+        logger.info(f"响应文本: {resp.text}")
+        print(resp.text)
+
+
+if __name__ == "__main__":
+    # 示例:调用 get_acl_rules
+    try:
+        resp, data = get_acl_rules()
+        _print_response(resp, data)
+    except FileNotFoundError as e:
+        get_logger("main").error(f"配置错误: {e}")
+        sys.exit(2)
+    except ValueError as e:
+        get_logger("main").error(f"会话错误: {e}")
+        sys.exit(3)
+    except requests.RequestException as e:
+        get_logger("main").error(f"请求失败: {e}")
+        sys.exit(1)

+ 0 - 101
acl_post.py

@@ -1,101 +0,0 @@
-import json
-import sys
-from pathlib import Path
-
-import requests
-
-from log_util import get_logger, log_request
-from session_state import get_sess_key
-
-
-ROOT = Path(__file__).resolve().parent
-CONFIG_PATH = ROOT / "config.json"
-
-
-def load_config():
-    if not CONFIG_PATH.exists():
-        raise FileNotFoundError(f"配置文件未找到: {CONFIG_PATH}")
-    with open(CONFIG_PATH, "r", encoding="utf-8") as f:
-        return json.load(f)
-
-
-DEFAULT_PAYLOAD = {
-    "func_name": "acl",
-    "action": "show",
-    "param": {
-        "TYPE": "total,data",
-        "limit": "0,20",
-        "ORDER_BY": "",
-        "ORDER": ""
-    }
-}
-
-
-def get_acl_rules(payload: dict | None = None, timeout: int = 10):
-    """Send ACL show POST. Returns requests.Response and parsed JSON data.
-
-    payload: JSON payload to send. If None, uses DEFAULT_PAYLOAD.
-    timeout: request timeout in seconds.
-    """
-    cfg = load_config()
-    base = cfg.get("base_url", "").rstrip("/")
-    url = f"{base}/Action/call"
-    data = payload or DEFAULT_PAYLOAD
-    logger = get_logger("acl_post")
-
-    # Get sess_key from global state
-    sess_key = get_sess_key()
-    if not sess_key:
-        raise ValueError("未找到 sess_key,请先登录")
-
-    cookies = {"sess_key": sess_key.split("=")[1].rstrip(";")}  # Extract value from "sess_key=value;"
-
-    logger.debug(f"准备发送请求,URL: {url}")
-    resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
-
-    # 记录请求/响应
-    try:
-        log_request(logger, "get_acl_rules", url, data, resp)
-    except Exception:
-        logger.exception("记录请求/响应失败")
-
-    # Parse JSON response
-    try:
-        json_data = resp.json()
-    except ValueError:
-        json_data = None
-
-    return resp, json_data
-
-
-def main():
-    logger = get_logger("main")
-    try:
-        resp, data = get_acl_rules()
-    except FileNotFoundError as e:
-        logger.error(f"配置错误: {e}")
-        sys.exit(2)
-    except ValueError as e:
-        logger.error(f"会话错误: {e}")
-        sys.exit(3)
-    except requests.RequestException as e:
-        logger.error(f"请求失败: {e}")
-        sys.exit(1)
-
-    # 控制台友好输出
-    logger.info(f"状态: {resp.status_code}")
-    if data:
-        try:
-            pretty = json.dumps(data, ensure_ascii=False, indent=2)
-            logger.info(f"响应 JSON:\n{pretty}")
-            print(pretty)
-        except Exception:
-            logger.info(f"响应文本: {resp.text}")
-            print(resp.text)
-    else:
-        logger.info(f"响应文本: {resp.text}")
-        print(resp.text)
-
-
-if __name__ == "__main__":
-    main()

+ 0 - 141
add_acl_post.py

@@ -1,141 +0,0 @@
-import json
-import sys
-from pathlib import Path
-from typing import Optional
-
-import requests
-
-from log_util import get_logger, log_request
-from session_state import get_sess_key
-
-
-ROOT = Path(__file__).resolve().parent
-CONFIG_PATH = ROOT / "config.json"
-
-
-def load_config():
-    if not CONFIG_PATH.exists():
-        raise FileNotFoundError(f"配置文件未找到: {CONFIG_PATH}")
-    with open(CONFIG_PATH, "r", encoding="utf-8") as f:
-        return json.load(f)
-
-
-def add_acl_rule(
-    dst_addr: str,
-    comment: str,
-    protocol: str = "any",
-    action: str = "drop",
-    dir: str = "forward",
-    ctdir: str = "0",
-    iinterface: str = "any",
-    ointerface: str = "any",
-    src_addr: str = "",
-    src_port: str = "",
-    dst_port: str = "",
-    enabled: str = "yes",
-    week: str = "1234567",
-    time: str = "00:00-23:59",
-    ip_type: str = "4",
-    src6_addr: str = "",
-    dst6_addr: str = "",
-    src6_mode: str = "",
-    dst6_mode: str = "",
-    src6_suffix: str = "",
-    dst6_suffix: str = "",
-    timeout: int = 10
-):
-    """Send ACL add POST. Returns requests.Response and parsed JSON data.
-
-    dst_addr: Comma-separated destination addresses, e.g., "1.2.3.4,2.4.5.6"
-    comment: Comment for the rule, e.g., "remark_kkkkkk"
-    Other parameters have defaults based on the example.
-    """
-    cfg = load_config()
-    base = cfg.get("base_url", "").rstrip("/")
-    url = f"{base}/Action/call"
-    data = {
-        "func_name": "acl",
-        "action": "add",
-        "param": {
-            "protocol": protocol,
-            "action": action,
-            "dir": dir,
-            "ctdir": ctdir,
-            "iinterface": iinterface,
-            "ointerface": ointerface,
-            "src_addr": src_addr,
-            "dst_addr": dst_addr,
-            "src_port": src_port,
-            "dst_port": dst_port,
-            "comment": comment,
-            "enabled": enabled,
-            "week": week,
-            "time": time,
-            "ip_type": ip_type,
-            "src6_addr": src6_addr,
-            "dst6_addr": dst6_addr,
-            "src6_mode": src6_mode,
-            "dst6_mode": dst6_mode,
-            "src6_suffix": src6_suffix,
-            "dst6_suffix": dst6_suffix
-        }
-    }
-    logger = get_logger("add_acl_post")
-
-    # Get sess_key from global state
-    sess_key = get_sess_key()
-    if not sess_key:
-        raise ValueError("未找到 sess_key,请先登录")
-
-    cookies = {"sess_key": sess_key.split("=")[1].rstrip(";")}  # Extract value from "sess_key=value;"
-
-    logger.debug(f"准备发送请求,URL: {url}")
-    resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
-
-    # 记录请求/响应
-    try:
-        log_request(logger, "add_acl_rule", url, data, resp)
-    except Exception:
-        logger.exception("记录请求/响应失败")
-
-    # Parse JSON response
-    try:
-        json_data = resp.json()
-    except ValueError:
-        json_data = None
-
-    return resp, json_data
-
-
-def main():
-    logger = get_logger("main")
-    try:
-        # Example: Add ACL rule to drop traffic to 1.2.3.4 and 2.4.5.6 with comment "remark_kkkkkk"
-        resp, data = add_acl_rule(dst_addr="1.2.3.4,2.4.5.6", comment="remark_kkkkkk")
-    except FileNotFoundError as e:
-        logger.error(f"配置错误: {e}")
-        sys.exit(2)
-    except ValueError as e:
-        logger.error(f"会话错误: {e}")
-        sys.exit(3)
-    except requests.RequestException as e:
-        logger.error(f"请求失败: {e}")
-        sys.exit(1)
-
-    # 控制台友好输出
-    logger.info(f"状态: {resp.status_code}")
-    if data:
-        try:
-            pretty = json.dumps(data, ensure_ascii=False, indent=2)
-            logger.info(f"响应 JSON:\n{pretty}")
-            print(pretty)
-        except Exception:
-            logger.info(f"响应文本: {resp.text}")
-            print(resp.text)
-    else:
-        logger.info(f"响应文本: {resp.text}")
-        print(resp.text)
-
-
-if __name__ == "__main__":
-    main()

+ 155 - 0
auth_session.py

@@ -0,0 +1,155 @@
+"""Combined authentication and in-memory session state.
+
+This module merges `session_state.py` and `login_post.py` into a single
+convenient file. It exposes the session-state helpers:
+
+- set_sess_key(key: str) -> None
+- get_sess_key() -> Optional[str]
+- clear_sess_key() -> None
+
+and the `login` function which performs the POST request and stores the
+`sess_key` (if found) into the in-memory session state.
+
+It also provides a `main()` function intended for CLI use.
+"""
+from __future__ import annotations
+
+import json
+import re
+import sys
+from pathlib import Path
+from threading import Lock
+from typing import Optional, Tuple
+
+import requests
+
+from log_util import get_logger, log_request
+
+
+ROOT = Path(__file__).resolve().parent
+CONFIG_PATH = ROOT / "config.json"
+
+
+# --- simple thread-safe in-memory session state ---
+_lock = Lock()
+_sess_key: Optional[str] = None
+
+
+def set_sess_key(key: str) -> None:
+    """Set global sess_key (thread-safe)."""
+    global _sess_key
+    with _lock:
+        _sess_key = key
+
+
+def get_sess_key() -> Optional[str]:
+    """Get current sess_key, or None if not set (thread-safe)."""
+    with _lock:
+        return _sess_key
+
+
+def clear_sess_key() -> None:
+    """Clear current sess_key (thread-safe)."""
+    global _sess_key
+    with _lock:
+        _sess_key = None
+
+
+# --- login/request helpers ---
+def load_config() -> dict:
+    if not CONFIG_PATH.exists():
+        raise FileNotFoundError(f"配置文件未找到: {CONFIG_PATH}")
+    with open(CONFIG_PATH, "r", encoding="utf-8") as f:
+        return json.load(f)
+
+
+DEFAULT_PAYLOAD = {
+    "username": "xiaobai",
+    "passwd": "dc81b4427df07fd6b3ebcb05a7b34daf",
+    "pass": "c2FsdF8xMXhpYW9iYWku",
+    "remember_password": "",
+}
+
+
+def _extract_sess_cookie_from_response(resp: requests.Response) -> Optional[str]:
+    """Try to extract sess_key cookie string from response.
+
+    Returns a cookie string like 'sess_key=...;' if found, else None.
+    """
+    try:
+        sess_val = resp.cookies.get("sess_key")
+        if sess_val:
+            return f"sess_key={sess_val};"
+        set_cookie = resp.headers.get("Set-Cookie", "")
+        m = re.search(r"(sess_key=[^;]+;?)", set_cookie)
+        if m:
+            return m.group(1)
+    except Exception:
+        return None
+    return None
+
+
+def login(payload: dict | None = None, timeout: int = 10) -> Tuple[requests.Response, Optional[str]]:
+    """Send login POST. Returns (response, sess_cookie_or_none).
+
+    The function will also set the in-memory sess_key if it can be extracted.
+    """
+    cfg = load_config()
+    base = cfg.get("base_url", "").rstrip("/")
+    url = f"{base}/Action/login"
+    data = payload or DEFAULT_PAYLOAD
+    logger = get_logger("login_post")
+    logger.debug(f"准备发送请求,URL: {url}")
+    resp = requests.post(url, json=data, timeout=timeout)
+
+    sess_cookie = _extract_sess_cookie_from_response(resp)
+    if sess_cookie:
+        # Write sess_key into in-memory session state (strip trailing semicolon)
+        cookie_val = sess_cookie.split("=", 1)[1].rstrip(";")
+        try:
+            set_sess_key(cookie_val)
+            logger.info("sess_key 已保存到内存状态")
+        except Exception:
+            logger.exception("保存 sess_key 失败")
+
+    # Log request/response but do not fail on logging errors
+    try:
+        log_request(logger, "login", url, data, resp)
+    except Exception:
+        logger.exception("记录请求/响应失败")
+
+    return resp, sess_cookie
+
+
+def main() -> None:
+    logger = get_logger("main")
+    try:
+        resp, sess_cookie = login()
+    except FileNotFoundError as e:
+        logger.error(f"配置错误: {e}")
+        sys.exit(2)
+    except requests.RequestException as e:
+        logger.error(f"请求失败: {e}")
+        sys.exit(1)
+
+    logger.info(f"状态: {resp.status_code}")
+    content_type = resp.headers.get("Content-Type", "")
+    if "application/json" in content_type:
+        try:
+            pretty = json.dumps(resp.json(), ensure_ascii=False, indent=2)
+            logger.info(f"响应 JSON:\n{pretty}")
+            print(pretty)
+        except ValueError:
+            logger.info(f"响应文本: {resp.text}")
+            print(resp.text)
+    else:
+        logger.info(f"响应文本: {resp.text}")
+        print(resp.text)
+
+    if sess_cookie:
+        logger.info(f"提取到 sess_key: {sess_cookie}")
+        print(f"sess_key: {sess_cookie}")
+
+
+if __name__ == "__main__":
+    main()

+ 0 - 96
del_acl_post.py

@@ -1,96 +0,0 @@
-import json
-import sys
-from pathlib import Path
-from typing import Optional
-
-import requests
-
-from log_util import get_logger, log_request
-from session_state import get_sess_key
-
-
-ROOT = Path(__file__).resolve().parent
-CONFIG_PATH = ROOT / "config.json"
-
-
-def load_config():
-    if not CONFIG_PATH.exists():
-        raise FileNotFoundError(f"配置文件未找到: {CONFIG_PATH}")
-    with open(CONFIG_PATH, "r", encoding="utf-8") as f:
-        return json.load(f)
-
-
-def del_acl_rule(rule_id: int, timeout: int = 10):
-    """Send ACL delete POST. Returns requests.Response and parsed JSON data.
-
-    rule_id: The ID of the ACL rule to delete (required).
-    """
-    cfg = load_config()
-    base = cfg.get("base_url", "").rstrip("/")
-    url = f"{base}/Action/call"
-    data = {
-        "func_name": "acl",
-        "action": "del",
-        "param": {
-            "id": rule_id
-        }
-    }
-    logger = get_logger("del_acl_post")
-
-    # Get sess_key from global state
-    sess_key = get_sess_key()
-    if not sess_key:
-        raise ValueError("未找到 sess_key,请先登录")
-
-    cookies = {"sess_key": sess_key.split("=")[1].rstrip(";")}  # Extract value from "sess_key=value;"
-
-    logger.debug(f"准备发送请求,URL: {url}")
-    resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
-
-    # 记录请求/响应
-    try:
-        log_request(logger, "del_acl_rule", url, data, resp)
-    except Exception:
-        logger.exception("记录请求/响应失败")
-
-    # Parse JSON response
-    try:
-        json_data = resp.json()
-    except ValueError:
-        json_data = None
-
-    return resp, json_data
-
-
-def main():
-    logger = get_logger("main")
-    try:
-        # Example: Delete ACL rule with ID 14
-        resp, data = del_acl_rule(rule_id=14)
-    except FileNotFoundError as e:
-        logger.error(f"配置错误: {e}")
-        sys.exit(2)
-    except ValueError as e:
-        logger.error(f"会话错误: {e}")
-        sys.exit(3)
-    except requests.RequestException as e:
-        logger.error(f"请求失败: {e}")
-        sys.exit(1)
-
-    # 控制台友好输出
-    logger.info(f"状态: {resp.status_code}")
-    if data:
-        try:
-            pretty = json.dumps(data, ensure_ascii=False, indent=2)
-            logger.info(f"响应 JSON:\n{pretty}")
-            print(pretty)
-        except Exception:
-            logger.info(f"响应文本: {resp.text}")
-            print(resp.text)
-    else:
-        logger.info(f"响应文本: {resp.text}")
-        print(resp.text)
-
-
-if __name__ == "__main__":
-    main()

+ 0 - 146
edit_acl_post.py

@@ -1,146 +0,0 @@
-import json
-import sys
-from pathlib import Path
-from typing import Optional
-
-import requests
-
-from log_util import get_logger, log_request
-from session_state import get_sess_key
-
-
-ROOT = Path(__file__).resolve().parent
-CONFIG_PATH = ROOT / "config.json"
-
-
-def load_config():
-    if not CONFIG_PATH.exists():
-        raise FileNotFoundError(f"配置文件未找到: {CONFIG_PATH}")
-    with open(CONFIG_PATH, "r", encoding="utf-8") as f:
-        return json.load(f)
-
-
-def edit_acl_rule(
-    rule_id: int,
-    dst_addr: str = "",
-    comment: str = "",
-    protocol: str = "any",
-    action: str = "drop",
-    dir: str = "forward",
-    ctdir: int = 0,
-    iinterface: str = "any",
-    ointerface: str = "any",
-    src_addr: str = "",
-    src_port: str = "",
-    dst_port: str = "",
-    enabled: str = "yes",
-    week: str = "1234567",
-    time: str = "00:00-23:59",
-    ip_type: str = "4",
-    src6_addr: str = "",
-    dst6_addr: str = "",
-    src6_mode: int = 0,
-    dst6_mode: int = 0,
-    src6_suffix: str = "",
-    dst6_suffix: str = "",
-    src6_mac: str = "",
-    dst6_mac: str = "",
-    timeout: int = 10
-):
-    """Send ACL edit POST. Returns requests.Response and parsed JSON data.
-
-    rule_id: The ID of the ACL rule to edit (required).
-    Other parameters are optional and will update the rule if provided.
-    """
-    cfg = load_config()
-    base = cfg.get("base_url", "").rstrip("/")
-    url = f"{base}/Action/call"
-    data = {
-        "func_name": "acl",
-        "action": "edit",
-        "param": {
-            "id": rule_id,
-            "protocol": protocol,
-            "action": action,
-            "dir": dir,
-            "ctdir": ctdir,
-            "iinterface": iinterface,
-            "ointerface": ointerface,
-            "src_addr": src_addr,
-            "dst_addr": dst_addr,
-            "src_port": src_port,
-            "dst_port": dst_port,
-            "comment": comment,
-            "enabled": enabled,
-            "week": week,
-            "time": time,
-            "ip_type": ip_type,
-            "src6_addr": src6_addr,
-            "dst6_addr": dst6_addr,
-            "src6_mode": src6_mode,
-            "dst6_mode": dst6_mode,
-            "src6_suffix": src6_suffix,
-            "dst6_suffix": dst6_suffix,
-            "src6_mac": src6_mac,
-            "dst6_mac": dst6_mac
-        }
-    }
-    logger = get_logger("edit_acl_post")
-
-    # Get sess_key from global state
-    sess_key = get_sess_key()
-    if not sess_key:
-        raise ValueError("未找到 sess_key,请先登录")
-
-    cookies = {"sess_key": sess_key.split("=")[1].rstrip(";")}  # Extract value from "sess_key=value;"
-
-    logger.debug(f"准备发送请求,URL: {url}")
-    resp = requests.post(url, json=data, cookies=cookies, timeout=timeout)
-
-    # 记录请求/响应
-    try:
-        log_request(logger, "edit_acl_rule", url, data, resp)
-    except Exception:
-        logger.exception("记录请求/响应失败")
-
-    # Parse JSON response
-    try:
-        json_data = resp.json()
-    except ValueError:
-        json_data = None
-
-    return resp, json_data
-
-
-def main():
-    logger = get_logger("main")
-    try:
-        # Example: Edit ACL rule with ID 14, change dst_addr to "1.2.3.4" and comment to "remark_kkkkkk"
-        resp, data = edit_acl_rule(rule_id=14, dst_addr="1.2.3.4", comment="remark_kkkkkk")
-    except FileNotFoundError as e:
-        logger.error(f"配置错误: {e}")
-        sys.exit(2)
-    except ValueError as e:
-        logger.error(f"会话错误: {e}")
-        sys.exit(3)
-    except requests.RequestException as e:
-        logger.error(f"请求失败: {e}")
-        sys.exit(1)
-
-    # 控制台友好输出
-    logger.info(f"状态: {resp.status_code}")
-    if data:
-        try:
-            pretty = json.dumps(data, ensure_ascii=False, indent=2)
-            logger.info(f"响应 JSON:\n{pretty}")
-            print(pretty)
-        except Exception:
-            logger.info(f"响应文本: {resp.text}")
-            print(resp.text)
-    else:
-        logger.info(f"响应文本: {resp.text}")
-        print(resp.text)
-
-
-if __name__ == "__main__":
-    main()

+ 0 - 103
login_post.py

@@ -1,103 +0,0 @@
-import json
-import sys
-from pathlib import Path
-
-import requests
-
-from log_util import get_logger, log_request
-
-
-ROOT = Path(__file__).resolve().parent
-CONFIG_PATH = ROOT / "config.json"
-
-
-def load_config():
-    if not CONFIG_PATH.exists():
-        raise FileNotFoundError(f"配置文件未找到: {CONFIG_PATH}")
-    with open(CONFIG_PATH, "r", encoding="utf-8") as f:
-        return json.load(f)
-
-
-DEFAULT_PAYLOAD = {
-    "username": "xiaobai",
-    "passwd": "dc81b4427df07fd6b3ebcb05a7b34daf",
-    "pass": "c2FsdF8xMXhpYW9iYWku",
-    "remember_password": ""
-}
-
-
-def login(payload: dict | None = None, timeout: int = 10):
-    """Send login POST. Returns requests.Response.
-
-    payload: JSON payload to send. If None, uses DEFAULT_PAYLOAD.
-    timeout: request timeout in seconds.
-    """
-    cfg = load_config()
-    base = cfg.get("base_url", "").rstrip("/")
-    url = f"{base}/Action/login"
-    data = payload or DEFAULT_PAYLOAD
-    logger = get_logger("login_post")
-    logger.debug(f"准备发送请求,URL: {url}")
-    resp = requests.post(url, json=data, timeout=timeout)
-
-    # Try to extract sess_key from cookies or Set-Cookie header.
-    sess_cookie = None
-    try:
-        # requests exposes cookies in resp.cookies
-        sess_val = resp.cookies.get("sess_key")
-        if sess_val:
-            sess_cookie = f"sess_key={sess_val};"
-        else:
-            set_cookie = resp.headers.get("Set-Cookie", "")
-            import re
-
-            m = re.search(r"(sess_key=[^;]+;?)", set_cookie)
-            if m:
-                sess_cookie = m.group(1)
-    except Exception:
-        # Non-fatal: if extraction fails, return None for sess_cookie
-        sess_cookie = None
-
-    # 记录请求/响应
-    try:
-        log_request(logger, "login", url, data, resp)
-    except Exception:
-        logger.exception("记录请求/响应失败")
-
-    return resp, sess_cookie
-
-
-def main():
-    logger = get_logger("main")
-    try:
-        resp, sess_cookie = login()
-    except FileNotFoundError as e:
-        logger.error(f"配置错误: {e}")
-        sys.exit(2)
-    except requests.RequestException as e:
-        logger.error(f"请求失败: {e}")
-        sys.exit(1)
-    # 控制台友好输出
-    logger.info(f"状态: {resp.status_code}")
-    # Try to pretty-print JSON if possible
-    content_type = resp.headers.get("Content-Type", "")
-    if "application/json" in content_type:
-        try:
-            pretty = json.dumps(resp.json(), ensure_ascii=False, indent=2)
-            logger.info(f"响应 JSON:\n{pretty}")
-            print(pretty)
-        except ValueError:
-            logger.info(f"响应文本: {resp.text}")
-            print(resp.text)
-    else:
-        logger.info(f"响应文本: {resp.text}")
-        print(resp.text)
-
-    if sess_cookie:
-        # Print the raw sess_key cookie string
-        logger.info(f"提取到 sess_key: {sess_cookie}")
-        print(f"sess_key: {sess_cookie}")
-
-
-if __name__ == "__main__":
-    main()

+ 2 - 6
main.py

@@ -1,12 +1,8 @@
 """Entry point to call various API actions."""
 """Entry point to call various API actions."""
 
 
-from login_post import login
+from auth_session import login, set_sess_key
 from log_util import get_logger
 from log_util import get_logger
-from session_state import set_sess_key
-from acl_post import get_acl_rules
-from add_acl_post import add_acl_rule
-from edit_acl_post import edit_acl_rule
-from del_acl_post import del_acl_rule
+from acl_actions import get_acl_rules, add_acl_rule, edit_acl_rule, del_acl_rule
 
 
 
 
 def main():
 def main():

+ 0 - 29
session_state.py

@@ -1,29 +0,0 @@
-"""Simple thread-safe in-memory session state for storing sess_key.
-
-提供 set_sess_key/get_sess_key/clear_sess_key 接口,供其他模块在调用 API 时获取会话 key。
-"""
-from threading import Lock
-from typing import Optional
-
-_lock = Lock()
-_sess_key: Optional[str] = None
-
-
-def set_sess_key(key: str) -> None:
-    """设置全局 sess_key(线程安全)。"""
-    global _sess_key
-    with _lock:
-        _sess_key = key
-
-
-def get_sess_key() -> Optional[str]:
-    """获取当前的 sess_key,如果没有则返回 None(线程安全)。"""
-    with _lock:
-        return _sess_key
-
-
-def clear_sess_key() -> None:
-    """清除当前 sess_key(线程安全)。"""
-    global _sess_key
-    with _lock:
-        _sess_key = None